diff --git a/continew-starter-bom/pom.xml b/continew-starter-bom/pom.xml index ce812e6a..e3c048cb 100644 --- a/continew-starter-bom/pom.xml +++ b/continew-starter-bom/pom.xml @@ -190,6 +190,13 @@ ${revision} + + + top.continew.starter + continew-starter-messaging-mqtt + ${revision} + + top.continew.starter diff --git a/continew-starter-core/src/main/java/top/continew/starter/core/constant/PropertiesConstants.java b/continew-starter-core/src/main/java/top/continew/starter/core/constant/PropertiesConstants.java index 0f2e8644..5eb3f7cf 100644 --- a/continew-starter-core/src/main/java/top/continew/starter/core/constant/PropertiesConstants.java +++ b/continew-starter-core/src/main/java/top/continew/starter/core/constant/PropertiesConstants.java @@ -124,6 +124,11 @@ public class PropertiesConstants { */ public static final String MESSAGING_WEBSOCKET = MESSAGING + StringConstants.DOT + "websocket"; + /** + * MQTT 配置 + */ + public static final String MESSAGING_MQTT = MESSAGING + StringConstants.DOT + "mqtt"; + /** * 日志配置 */ diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/pom.xml b/continew-starter-messaging/continew-starter-messaging-mqtt/pom.xml new file mode 100644 index 00000000..16de057f --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + top.continew.starter + continew-starter-messaging + ${revision} + + + + continew-starter-messaging-mqtt + jar + + ${project.artifactId} + ContiNew Starter 消息模块 - MQTT + + + + + org.springframework.integration + spring-integration-mqtt + + + + + diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/annotation/MqttListener.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/annotation/MqttListener.java new file mode 100644 index 00000000..841bc6c3 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/annotation/MqttListener.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.annotation; + +import org.springframework.stereotype.Component; + +import java.lang.annotation.*; + +/** + * mqtt topic 监听器 + * + * @author echo + * @since 2.15.0 + */ +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Inherited +@Component +public @interface MqttListener { + + /** + * 要监听的 MQTT 主题 + *

支持以下配置方式: + *

{@code
+     * 方式1: 直接指定主题
+     * @MqttListener(topic = "sensor/temperature")
+     *
+     * 方式2: 使用配置文件占位符
+     * 
+     * @MqttListener(topic = "${mqtt.topic}")
+     *
+     *                     方式3: 使用通配符
+     * @MqttListener(topic = "sensor/+/temperature") // 单级通配符
+     * @MqttListener(topic = "sensor/#") // 多级通配符
+     *                     }
+ * + *

通配符说明: + *

+ * + * @return MQTT 主题字符串或配置占位符表达式 + */ + String topic(); + + /** + * QoS - 消息传输可靠性等级 + *

支持以下配置方式: + *

+ *

QoS 等级说明: + *

+ * + * @return QoS 等级字符串或配置占位符,默认为 "0" + */ + String qos() default "0"; +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/MqttAutoConfiguration.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/MqttAutoConfiguration.java new file mode 100644 index 00000000..d4e41df5 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/MqttAutoConfiguration.java @@ -0,0 +1,336 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.autoconfigure; + +import cn.hutool.core.util.ObjectUtil; +import jakarta.annotation.PostConstruct; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.core.env.Environment; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.ExecutorChannel; +import org.springframework.integration.gateway.GatewayProxyFactoryBean; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.StringUtils; +import top.continew.starter.core.constant.PropertiesConstants; +import top.continew.starter.core.constant.StringConstants; +import top.continew.starter.messaging.mqtt.autoconfigure.properties.*; +import top.continew.starter.messaging.mqtt.constant.MqttConstant; +import top.continew.starter.messaging.mqtt.handler.MqttMessageInboundHandler; +import top.continew.starter.messaging.mqtt.handler.MqttShutdownHandler; +import top.continew.starter.messaging.mqtt.msg.MqttMessageConsumer; +import top.continew.starter.messaging.mqtt.msg.MqttMessageProducer; +import top.continew.starter.messaging.mqtt.strategy.MqttOptions; +import top.continew.starter.messaging.mqtt.strategy.MqttTemplate; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * MQTT 自动配置类 + *

+ * 用于配置 MQTT 连接参数、入站/出站通道及消息处理器等组件。 + *

+ * + * @author echo + * @since 2.15.0 + */ +@AutoConfiguration +@EnableConfigurationProperties(MqttProperties.class) +@ConditionalOnProperty(prefix = PropertiesConstants.MESSAGING_MQTT, value = PropertiesConstants.ENABLED, havingValue = "true") +public class MqttAutoConfiguration { + + private static final Logger log = LoggerFactory.getLogger(MqttAutoConfiguration.class); + + private final MqttProperties mqttProperties; + + public MqttAutoConfiguration(MqttProperties mqttProperties) { + this.mqttProperties = mqttProperties; + } + + /** + * 配置 MQTT 客户端连接选项 + *

+ * 该方法创建并配置 {@link MqttConnectOptions} 实例,用于建立 MQTT 客户端与服务器的连接。 + * 包含认证信息、连接参数、重连策略、SSL/TLS 配置以及遗嘱消息等完整配置。 + *

+ * + * @return 配置完成的 MQTT 连接选项对象 + * @see MqttConnectOptions + * @see MqttProperties + */ + @Bean + public MqttConnectOptions mqttConnectOptions() { + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + + // 设置 MQTT 服务器地址,支持 tcp://host:port 或 ssl://host:port 格式 + mqttConnectOptions.setServerURIs(new String[] {mqttProperties.getHost()}); + + // 设置用户名(用于认证) + mqttConnectOptions.setUserName(mqttProperties.getUsername()); + + // 设置密码(转为 char[] 以增强安全性) + mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); + + // 设置连接超时时间(单位:秒),默认 30 秒 + mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeout()); + + // 设置心跳间隔(单位:秒),用于维持长连接,默认 60 秒 + mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); + + // 设置是否清除会话 + // true:每次连接时清除上次会话状态 + // false:保留订阅信息与未确认消息 + mqttConnectOptions.setCleanSession(mqttProperties.getCleanSession()); + + // 启用自动重连机制,默认 true + mqttConnectOptions.setAutomaticReconnect(mqttProperties.getAutomaticReconnect()); + + // 设置最大重连延迟(单位:毫秒),防止频繁重连,默认 128000 毫秒 + mqttConnectOptions.setMaxReconnectDelay(mqttProperties.getMaxReconnectDelay()); + + // 设置最大允许未确认的 QoS>0 消息数量,控制并发发送能力,默认 10 + mqttConnectOptions.setMaxInflight(mqttProperties.getMaxInflight()); + + // 设置自定义 WebSocket 请求头(仅在使用 ws:// 或 wss:// 协议时生效) + mqttConnectOptions.setCustomWebSocketHeaders(mqttProperties.getCustomWebSocketHeaders()); + + // 启用 HTTPS 主机名验证(适用于 SSL/TLS) + mqttConnectOptions.setHttpsHostnameVerificationEnabled(mqttProperties.getHttpsHostnameVerificationEnabled()); + + // 设置 SSL 连接所需的客户端属性,如证书、密钥、信任库等 + mqttConnectOptions.setSSLProperties(mqttProperties.getSslClientProps()); + + // 设置关闭 ExecutorService 的超时时间(单位:秒),默认 1 秒 + mqttConnectOptions.setExecutorServiceTimeout(mqttProperties.getExecutorServiceTimeout()); + + // 设置遗嘱消息(当客户端异常断开连接时由服务器自动发送) + MqttWillProperties will = mqttProperties.getWill(); + if (ObjectUtil.isNotEmpty(will)) { + // 设置遗嘱主题 + // 设置遗嘱消息内容(字节数组) + // 设置 QoS 等级 + // 设置是否保留该消息(true 表示新订阅者会收到) + mqttConnectOptions.setWill(will.getTopic(), will.getPayload().getBytes(), will.getQos(), will + .getRetained()); + } + + return mqttConnectOptions; + } + + /** + * 配置 MQTT 客户端工厂,关联连接参数。 + */ + @Bean + public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) { + DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory(); + clientFactory.setConnectionOptions(mqttConnectOptions); + return clientFactory; + } + + /** + * 配置入站消息通道。 + * - 若 consumer.async = true,则使用线程池处理(异步) + * - 否则使用 DirectChannel(同步) + */ + @Bean(name = MqttConstant.MQTT_INPUT_CHANNEL_NAME) + public MessageChannel mqttInputChannel() { + MqttConsumerProperties consumer = mqttProperties.getConsumer(); + Boolean async = consumer.getAsync(); + if (Boolean.TRUE.equals(async)) { + return new ExecutorChannel(mqttConsumerExecutor()); + } else { + return new DirectChannel(); + } + } + + /** + * 配置 MQTT 消息消费处理通道(队列模式),供消费者拉取使用。 + */ + @Bean(name = MqttConstant.MQTT_OUT_BOUND_CHANNEL_NAME) + public MessageChannel consumerChannel() { + MqttProducerProperties producer = mqttProperties.getProducer(); + Boolean async = producer.getAsync(); + + if (Boolean.TRUE.equals(async)) { + return new ExecutorChannel(mqttProducerExecutor()); + } else { + return new DirectChannel(); + } + } + + /** + * 配置 MQTT 出站通道,向 broker 发送消息。 + */ + @Bean(name = MqttConstant.CONSUMER_CHANNEL_NAME) + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + /** + * 配置 MQTT 入站适配器,接收来自 MQTT broker 的消息。 + */ + @Bean + public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter(MqttPahoClientFactory mqttPahoClientFactory, + @Qualifier(MqttConstant.MQTT_INPUT_CHANNEL_NAME) MessageChannel mqttInputChannel, + Environment environment) { + + MqttConsumerProperties consumer = mqttProperties.getConsumer(); + String clientId = consumer.getClientId(); + if (!StringUtils.hasText(clientId)) { + clientId = getClientId(environment); + } + + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttPahoClientFactory); + adapter.setAutoStartup(consumer.getAutoStartUp()); + adapter.setOutputChannel(mqttInputChannel); + adapter.setQos(consumer.getQos()); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setCompletionTimeout(consumer.getCompletionTimeout()); + adapter.setDisconnectCompletionTimeout(consumer.getDisconnectCompletionTimeout()); + return adapter; + } + + /** + * 配置 MQTT 出站消息处理器,用于发布消息。 + */ + @Bean + @ServiceActivator(inputChannel = MqttConstant.MQTT_OUT_BOUND_CHANNEL_NAME) + public MqttPahoMessageHandler mqttOutbound(MqttPahoClientFactory mqttPahoClientFactory, Environment environment) { + MqttProducerProperties producer = mqttProperties.getProducer(); + String clientId = producer.getClientId(); + if (!StringUtils.hasText(clientId)) { + clientId = getClientId(environment); + } + + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttPahoClientFactory); + messageHandler.setAsync(producer.getAsync()); + messageHandler.setAsyncEvents(producer.getAsyncEvents()); + messageHandler.setDefaultTopic(producer.getDefaultTopic()); + messageHandler.setDefaultQos(producer.getDefaultQos()); + messageHandler.setConverter(new DefaultPahoMessageConverter()); + messageHandler.setDefaultRetained(producer.getDefaultRetained()); + return messageHandler; + } + + /** + * 构造封装的 MqttTemplate 工具类,提供更易用的发送/订阅能力。 + */ + @Bean + public MqttOptions mqttOptions(MqttPahoMessageDrivenChannelAdapter adapter) { + return new MqttTemplate(adapter); + } + + /** + * 构造入站消息处理器,分发到自定义监听器中。 + */ + @Bean + public MqttMessageInboundHandler mqttMessageInboundHandler(List messageListeners, + MqttOptions mqttOptions, + Environment environment) { + return new MqttMessageInboundHandler(messageListeners, mqttOptions, environment); + } + + /** + * 配置 MQTT 消息生产者网关 + */ + @Bean + public GatewayProxyFactoryBean mqttMessageProducer(@Qualifier(MqttConstant.MQTT_OUT_BOUND_CHANNEL_NAME) MessageChannel outboundChannel) { + GatewayProxyFactoryBean factoryBean = new GatewayProxyFactoryBean<>(MqttMessageProducer.class); + factoryBean.setDefaultRequestChannel(outboundChannel); + return factoryBean; + } + + /** + * 消费者异步线程池 + */ + public ThreadPoolTaskExecutor mqttConsumerExecutor() { + MqttExecutorProperties executorProperties = mqttProperties.getConsumer().getExecutor(); + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(executorProperties.getCorePoolSize()); + executor.setMaxPoolSize(executorProperties.getMaxPoolSize()); + executor.setQueueCapacity(executorProperties.getQueueCapacity()); + executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds()); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setThreadNamePrefix("mqtt-consumer-"); + executor.initialize(); + return executor; + } + + /** + * 生产者异步线程池 + * + * @return {@link ThreadPoolTaskExecutor } + */ + public ThreadPoolTaskExecutor mqttProducerExecutor() { + MqttExecutorProperties executorProperties = mqttProperties.getProducer().getExecutor(); + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(executorProperties.getCorePoolSize()); + executor.setMaxPoolSize(executorProperties.getMaxPoolSize()); + executor.setQueueCapacity(executorProperties.getQueueCapacity()); + executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds()); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setThreadNamePrefix("mqtt-producer-"); + executor.initialize(); + return executor; + } + + /** + * 自动生成客户端 ID,避免冲突。 + */ + private String getClientId(Environment environment) { + String applicationName = environment.getProperty("spring.application.name", "mqtt"); + return applicationName + StringConstants.DASHED + UUID.randomUUID() + .toString() + .replace(StringConstants.DASHED, ""); + } + + /** + * mqtt关闭处理程序 + * + * @param inboundAdapter Mqtt Paho消息驱动通道适配器 + * @param outboundHandler MQTT Paho 消息处理器 + * @return {@link MqttShutdownHandler } + */ + @Bean + public MqttShutdownHandler mqttShutdownHandler(MqttPahoMessageDrivenChannelAdapter inboundAdapter, + MqttPahoMessageHandler outboundHandler) { + return new MqttShutdownHandler(inboundAdapter, outboundHandler); + } + + /** + * 自动配置类完成初始化时打印日志。 + */ + @PostConstruct + public void postConstruct() { + log.info("[ContiNew Starter] - Auto Configuration 'MQTT' completed initialization."); + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttConsumerProperties.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttConsumerProperties.java new file mode 100644 index 00000000..c935bf4f --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttConsumerProperties.java @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.autoconfigure.properties; + +import org.springframework.boot.context.properties.NestedConfigurationProperty; +import org.springframework.integration.mqtt.core.ClientManager; +import top.continew.starter.messaging.mqtt.enums.MqttQoS; + +/** + * 消费者属性 + * + * @author echo + * @since 2.15.0 + */ +public class MqttConsumerProperties { + + /** + * MQTT 服务质量等级(QoS, Quality of Service)。 + * 0:最多一次(AT_MOST_ONCE),不保证消息到达; + * 1:至少一次(AT_LEAST_ONCE),可能重复; + * 2:只有一次(EXACTLY_ONCE),确保消息仅到达一次。 + * 默认使用 QoS 0。 + */ + private Integer qos = MqttQoS.AT_MOST_ONCE.value(); + + /** + * 消息发送完成等待超时时间(单位:毫秒)。 + * 控制发送消息时,等待 broker 响应的最大时长,超过将报错。 + * 默认值参考 ClientManager.DEFAULT_COMPLETION_TIMEOUT。 + */ + private Long completionTimeout = ClientManager.DEFAULT_COMPLETION_TIMEOUT; + + /** + * 是否自动启动客户端连接。 + * 设置为 true 则在应用启动时自动连接 MQTT 服务器并订阅 Topic。 + * 默认值为 true。 + */ + private Boolean autoStartUp = true; + + /** + * MQTT 客户端 ID。 + * 用于唯一标识客户端连接,同一 broker 下不能重复。 + * 如果为空,可能由系统自动生成。 + */ + private String clientId; + + /** + * 是否启用异步消息发送。 + * 设置为 true 则消息发送不阻塞当前线程,适用于高吞吐场景; + * 设置为 false 则同步发送,便于确认是否成功。 + * 默认值为 false。 + */ + private Boolean async = false; + + /** + * 客户端断开连接时的完成超时时间(单位:毫秒)。 + * 用于控制断连操作的最长等待时间。 + * 默认值参考 ClientManager.DISCONNECT_COMPLETION_TIMEOUT。 + */ + private Long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT; + + /** + * MQTT 消息处理线程池配置。 + * 包含核心线程数、最大线程数、队列容量等,控制消费者消息处理能力。 + * 默认配置为 new MqttExecutorProperties()。 + */ + @NestedConfigurationProperty + private MqttExecutorProperties executor = new MqttExecutorProperties(); + + public Integer getQos() { + return qos; + } + + public void setQos(Integer qos) { + this.qos = qos; + } + + public Long getCompletionTimeout() { + return completionTimeout; + } + + public void setCompletionTimeout(Long completionTimeout) { + this.completionTimeout = completionTimeout; + } + + public Boolean getAutoStartUp() { + return autoStartUp; + } + + public void setAutoStartUp(Boolean autoStartUp) { + this.autoStartUp = autoStartUp; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public Boolean getAsync() { + return async; + } + + public void setAsync(Boolean async) { + this.async = async; + } + + public Long getDisconnectCompletionTimeout() { + return disconnectCompletionTimeout; + } + + public void setDisconnectCompletionTimeout(Long disconnectCompletionTimeout) { + this.disconnectCompletionTimeout = disconnectCompletionTimeout; + } + + public MqttExecutorProperties getExecutor() { + return executor; + } + + public void setExecutor(MqttExecutorProperties executor) { + this.executor = executor; + } +} \ No newline at end of file diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttExecutorProperties.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttExecutorProperties.java new file mode 100644 index 00000000..bc3292bc --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttExecutorProperties.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.autoconfigure.properties; + +/** + * 连接池属性 + * + * @author echo + * @since 2.15.0 + */ +public class MqttExecutorProperties { + + /** + * 线程池核心线程数,表示即使线程处于空闲状态,也会保留的最小线程数。 + * 通常设置为常规负载下的并发处理数量,默认值为 5。 + */ + private Integer corePoolSize = 5; + + /** + * 线程池最大线程数,表示线程池允许创建的最大线程数量。 + * 超过 corePoolSize 后,如果任务队列已满,会继续创建线程直到该值。 + * 默认值为 10。 + */ + private Integer maxPoolSize = 10; + + /** + * 线程池中线程最大空闲时间(单位:秒)。 + * 当线程数量超过 corePoolSize 且处于空闲状态超过该时间时会被销毁。 + * 默认值为 60 秒。 + */ + private Integer keepAliveSeconds = 60; + + /** + * 线程池的任务队列容量。 + * 用于缓冲提交但尚未执行的任务,超过该容量时新任务会触发拒绝策略。 + * 默认值为 512。 + */ + private Integer queueCapacity = 512; + + public Integer getCorePoolSize() { + return corePoolSize; + } + + public void setCorePoolSize(Integer corePoolSize) { + this.corePoolSize = corePoolSize; + } + + public Integer getMaxPoolSize() { + return maxPoolSize; + } + + public void setMaxPoolSize(Integer maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } + + public Integer getKeepAliveSeconds() { + return keepAliveSeconds; + } + + public void setKeepAliveSeconds(Integer keepAliveSeconds) { + this.keepAliveSeconds = keepAliveSeconds; + } + + public Integer getQueueCapacity() { + return queueCapacity; + } + + public void setQueueCapacity(Integer queueCapacity) { + this.queueCapacity = queueCapacity; + } +} \ No newline at end of file diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttProducerProperties.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttProducerProperties.java new file mode 100644 index 00000000..4af4b4c4 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttProducerProperties.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.autoconfigure.properties; + +import org.springframework.boot.context.properties.NestedConfigurationProperty; +import top.continew.starter.messaging.mqtt.enums.MqttQoS; + +/** + * 生产者属性 + * + * @author echo + * @since 2.15.0 + */ +public class MqttProducerProperties { + + /** + * 默认的消息服务质量等级(QoS, Quality of Service)。 + * 0:最多一次(AT_MOST_ONCE),不保证送达; + * 1:至少一次(AT_LEAST_ONCE),可能重复; + * 2:只有一次(EXACTLY_ONCE),确保仅送达一次。 + * 可在发送时指定不同 QoS,此为默认值。 + * 默认值:0(AT_MOST_ONCE)。 + */ + private Integer defaultQos = MqttQoS.AT_MOST_ONCE.value(); + + /** + * MQTT 客户端 ID。 + * 用于标识连接的唯一客户端,在 broker 中必须唯一; + * 若为空,通常系统会自动生成。 + */ + private String clientId; + + /** + * 默认发布的 Topic。 + * 当未指定 topic 时使用此 topic 发送消息。 + * 默认值:"producer"。 + */ + private String defaultTopic = "producer"; + + /** + * 是否启用异步发送模式。 + * true 表示消息发送不会阻塞当前线程; + * false 表示同步等待发送完成。 + * 默认值为 false。 + */ + private Boolean async = false; + + /** + * 是否异步触发发送事件(例如发送回调等)。 + * 仅在 `async = true` 时生效; + * 设置为 true 可提升事件处理性能。 + * 默认值为 false。 + */ + private Boolean asyncEvents = false; + + /** + * 是否设置消息为保留(Retained)消息。 + * Retained 消息在发送后 broker 会保留并在新订阅者订阅时立刻推送; + * 可用于设备初始状态等场景。 + * 默认值为 false(不保留)。 + */ + private Boolean defaultRetained = false; + + /** + * MQTT 消息处理线程池配置。 + * 包含核心线程数、最大线程数、队列容量等,控制消费者消息处理能力。 + * 默认配置为 new MqttExecutorProperties()。 + */ + @NestedConfigurationProperty + private MqttExecutorProperties executor = new MqttExecutorProperties(); + + public Integer getDefaultQos() { + return defaultQos; + } + + public void setDefaultQos(Integer defaultQos) { + this.defaultQos = defaultQos; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getDefaultTopic() { + return defaultTopic; + } + + public void setDefaultTopic(String defaultTopic) { + this.defaultTopic = defaultTopic; + } + + public Boolean getAsync() { + return async; + } + + public void setAsync(Boolean async) { + this.async = async; + } + + public Boolean getAsyncEvents() { + return asyncEvents; + } + + public void setAsyncEvents(Boolean asyncEvents) { + this.asyncEvents = asyncEvents; + } + + public Boolean getDefaultRetained() { + return defaultRetained; + } + + public void setDefaultRetained(Boolean defaultRetained) { + this.defaultRetained = defaultRetained; + } + + public MqttExecutorProperties getExecutor() { + return executor; + } + + public void setExecutor(MqttExecutorProperties executor) { + this.executor = executor; + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttProperties.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttProperties.java new file mode 100644 index 00000000..51f5b769 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttProperties.java @@ -0,0 +1,288 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.autoconfigure.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.NestedConfigurationProperty; +import top.continew.starter.core.constant.PropertiesConstants; + +import javax.net.ssl.HostnameVerifier; +import java.util.Properties; + +/** + * 配置参数 + * + * @author echo + * @since 2.15.0 + */ +@ConfigurationProperties(prefix = PropertiesConstants.MESSAGING_MQTT) +public class MqttProperties { + + /** + * 开关 + */ + private boolean enabled; + + /** + * 地址 格式 tcp://192.168.20.95:1883 + */ + private String host; + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 保持连接的间隔时间(秒)。客户端会按照此间隔向服务器发送心跳,以维持连接。 + * 默认值:60 秒。 + */ + private Integer keepAliveInterval = 60; + + /** + * 客户端允许同时存在的最大未确认消息数。 + * 如果超出此数量,新的消息将被阻塞直到有确认消息返回。 + * 默认值:10。 + */ + private Integer maxInflight = 10; + + /** + * 遗嘱消息内容。客户端异常断开连接时,由服务器向指定主题发送的消息。 + * 可设置 topic payload、QOS、retained 等属性。 + */ + @NestedConfigurationProperty + private MqttWillProperties will; + + /** + * 配置 SSL 连接所需的客户端属性。 + * 例如:证书路径、密钥密码等。 + */ + @NestedConfigurationProperty + private Properties sslClientProps; + + /** + * 是否启用 HTTPS 主机名验证。 + * 若为 true,将校验服务端证书中的主机名是否与实际连接地址一致。 + * 默认值:false。 + */ + private Boolean httpsHostnameVerificationEnabled = false; + + /** + * 自定义的主机名校验器,用于验证 SSL/TLS 连接时服务端主机名是否合法。 + * 可用于替换默认的验证策略。 + */ + private HostnameVerifier sslHostnameVerifier; + + /** + * 是否使用清洁会话。 + * true 表示连接建立时清除之前的会话信息(订阅、未送达消息等), + * false 表示会话持久化。 + * 默认值:false(持久会话)。 + */ + private Boolean cleanSession = false; + + /** + * 连接超时时间(秒)。客户端尝试连接服务器的最长等待时间。 + * 默认值:30 秒。 + */ + private Integer connectionTimeout = 30; + + /** + * 是否启用自动重连。当连接断开时,是否自动尝试重新连接。 + * 默认值:true。 + */ + private Boolean automaticReconnect = true; + + /** + * 最大重连延迟时间(毫秒)。用于自动重连时的退避策略上限。 + * 默认值:128000(约 2 分钟)。 + */ + private Integer maxReconnectDelay = 128000; + + /** + * 自定义 WebSocket 请求头。 + * 用于配置使用 WebSocket 协议连接时的额外 HTTP 请求头参数。 + */ + @NestedConfigurationProperty + private Properties customWebSocketHeaders; + + /** + * 终止执行服务时等待多长时间(以秒为单位) + */ + private Integer executorServiceTimeout = 1; + + /** + * 生产者 + */ + @NestedConfigurationProperty + private MqttProducerProperties producer = new MqttProducerProperties(); + + /** + * 消费者 + */ + @NestedConfigurationProperty + private MqttConsumerProperties consumer = new MqttConsumerProperties(); + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public Integer getKeepAliveInterval() { + return keepAliveInterval; + } + + public void setKeepAliveInterval(Integer keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + } + + public Integer getMaxInflight() { + return maxInflight; + } + + public void setMaxInflight(Integer maxInflight) { + this.maxInflight = maxInflight; + } + + public MqttWillProperties getWill() { + return will; + } + + public void setWill(MqttWillProperties will) { + this.will = will; + } + + public Properties getSslClientProps() { + return sslClientProps; + } + + public void setSslClientProps(Properties sslClientProps) { + this.sslClientProps = sslClientProps; + } + + public Boolean getHttpsHostnameVerificationEnabled() { + return httpsHostnameVerificationEnabled; + } + + public void setHttpsHostnameVerificationEnabled(Boolean httpsHostnameVerificationEnabled) { + this.httpsHostnameVerificationEnabled = httpsHostnameVerificationEnabled; + } + + public HostnameVerifier getSslHostnameVerifier() { + return sslHostnameVerifier; + } + + public void setSslHostnameVerifier(HostnameVerifier sslHostnameVerifier) { + this.sslHostnameVerifier = sslHostnameVerifier; + } + + public Boolean getCleanSession() { + return cleanSession; + } + + public void setCleanSession(Boolean cleanSession) { + this.cleanSession = cleanSession; + } + + public Integer getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(Integer connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public Boolean getAutomaticReconnect() { + return automaticReconnect; + } + + public void setAutomaticReconnect(Boolean automaticReconnect) { + this.automaticReconnect = automaticReconnect; + } + + public Integer getMaxReconnectDelay() { + return maxReconnectDelay; + } + + public void setMaxReconnectDelay(Integer maxReconnectDelay) { + this.maxReconnectDelay = maxReconnectDelay; + } + + public Properties getCustomWebSocketHeaders() { + return customWebSocketHeaders; + } + + public void setCustomWebSocketHeaders(Properties customWebSocketHeaders) { + this.customWebSocketHeaders = customWebSocketHeaders; + } + + public Integer getExecutorServiceTimeout() { + return executorServiceTimeout; + } + + public void setExecutorServiceTimeout(Integer executorServiceTimeout) { + this.executorServiceTimeout = executorServiceTimeout; + } + + public MqttProducerProperties getProducer() { + return producer; + } + + public void setProducer(MqttProducerProperties producer) { + this.producer = producer; + } + + public MqttConsumerProperties getConsumer() { + return consumer; + } + + public void setConsumer(MqttConsumerProperties consumer) { + this.consumer = consumer; + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttWillProperties.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttWillProperties.java new file mode 100644 index 00000000..9a30af79 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/autoconfigure/properties/MqttWillProperties.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.autoconfigure.properties; + +import top.continew.starter.messaging.mqtt.enums.MqttQoS; + +/** + * 遗嘱消息属性 + * + * @author echo + * @since 2.15.0 + */ +public class MqttWillProperties { + + /** + * 遗嘱消息的目标主题。 + * 当客户端异常断开时,将向该主题发布遗嘱消息。 + */ + private String topic; + + /** + * 遗嘱消息内容 + */ + private String payload; + + /** + * 遗嘱消息的 QoS 等级 + * 0:最多一次;1:至少一次;2:只有一次 + * 默认值:0 + */ + private Integer qos = MqttQoS.AT_MOST_ONCE.value(); + + /** + * 是否设置为保留消息 + * true:新订阅者会立即收到该消息 + * 默认值:false + */ + private Boolean retained = false; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getPayload() { + return payload; + } + + public void setPayload(String payload) { + this.payload = payload; + } + + public Integer getQos() { + return qos; + } + + public void setQos(Integer qos) { + this.qos = qos; + } + + public Boolean getRetained() { + return retained; + } + + public void setRetained(Boolean retained) { + this.retained = retained; + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/constant/MqttConstant.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/constant/MqttConstant.java new file mode 100644 index 00000000..890666bb --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/constant/MqttConstant.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.constant; + +/** + * mqtt常量 + * + * @author echo + * @since 2.15.0 + */ +public class MqttConstant { + + /** + * MQTT 入站通道名称(消费者使用,接收消息的入口) + */ + public static final String MQTT_INPUT_CHANNEL_NAME = "mqttInputChannel"; + + /** + * MQTT 出站通道名称(生产者使用,发送消息的出口) + */ + public static final String MQTT_OUT_BOUND_CHANNEL_NAME = "mqttOutboundChannel"; + + /** + * 应用级消息消费处理通道名称(接收到 MQTT 消息后转发到此通道供业务处理) + */ + public static final String CONSUMER_CHANNEL_NAME = "consumerChannel"; + +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/enums/MqttQoS.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/enums/MqttQoS.java new file mode 100644 index 00000000..4e4edbbd --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/enums/MqttQoS.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.enums; + +import top.continew.starter.messaging.mqtt.exception.MqttException; + +/** + * qos 消息质量等级枚举 + * + * @author echo + * @since 2.15.0 + */ +public enum MqttQoS { + + /** + * QoS level 0 至多发送一次,发送即丢弃。没有确认消息,也不知道对方是否收到。 + */ + AT_MOST_ONCE(0), + /** + * QoS level 1 至少一次,都要在可变头部中附加一个16位的消息ID,SUBSCRIBE 和 UNSUBSCRIBE 消息使用 QoS level 1。 + */ + AT_LEAST_ONCE(1), + /** + * QoS level 2 确保只有一次,仅仅在 PUBLISH 类型消息中出现,要求在可变头部中要附加消息ID。 + */ + EXACTLY_ONCE(2), + /** + * 失败 + */ + FAILURE(0x80); + + private final int value; + + MqttQoS(int value) { + this.value = value; + } + + public int value() { + return value; + } + + public static MqttQoS valueOf(int value) { + return switch (value) { + case 0 -> AT_MOST_ONCE; + case 1 -> AT_LEAST_ONCE; + case 2 -> EXACTLY_ONCE; + case 0x80 -> FAILURE; + default -> throw new MqttException("无效的 QoS: " + value); + }; + } + + @Override + public String toString() { + return "QoS" + value; + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/enums/TopicFilterType.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/enums/TopicFilterType.java new file mode 100644 index 00000000..25f528d3 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/enums/TopicFilterType.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.enums; + +import top.continew.starter.messaging.mqtt.exception.MqttException; +import top.continew.starter.messaging.mqtt.util.TopicUtils; + +/** + * topic 筛选器类型 + * + * @author echo + * @since 2.15.0 + */ +public enum TopicFilterType { + + /** + * 默认 TopicFilter + */ + NONE { + @Override + public boolean match(String topicFilter, String topicName) { + return TopicUtils.match(topicFilter, topicName); + } + }, + + /** + * $queue/ 为前缀的共享订阅是不带群组的共享订阅 + */ + QUEUE { + @Override + public boolean match(String topicFilter, String topicName) { + int prefixLen = TopicFilterType.SHARE_QUEUE_PREFIX.length(); + return TopicUtils.match(topicFilter.substring(prefixLen), topicName); + } + }, + + /** + * $share/{group-name}/ 为前缀的共享订阅是带群组的共享订阅 + */ + SHARE { + @Override + public boolean match(String topicFilter, String topicName) { + // 去除前缀 $share// ,匹配 topicName / 前缀 + int prefixLen = TopicFilterType.findShareTopicIndex(topicFilter); + return TopicUtils.match(topicFilter.substring(prefixLen), topicName); + } + }; + + /** + * 共享订阅的 topic + */ + public static final String SHARE_QUEUE_PREFIX = "$queue/"; + + public static final String SHARE_GROUP_PREFIX = "$share/"; + + /** + * 判断 topicFilter 和 topicName 匹配情况 + * + * @param topicFilter topicFilter + * @param topicName topicName + * @return 是否匹配 + */ + public abstract boolean match(String topicFilter, String topicName); + + /** + * 获取 topicFilter 类型 + * + * @param topicFilter topicFilter + * @return TopicFilterType + */ + public static TopicFilterType getType(String topicFilter) { + if (topicFilter.startsWith(TopicFilterType.SHARE_QUEUE_PREFIX)) { + return TopicFilterType.QUEUE; + } else if (topicFilter.startsWith(TopicFilterType.SHARE_GROUP_PREFIX)) { + return TopicFilterType.SHARE; + } else { + return TopicFilterType.NONE; + } + } + + /** + * 读取共享订阅的分组名 + * + * @param topicFilter topicFilter + * @return 共享订阅分组名 + */ + public static String getShareGroupName(String topicFilter) { + int prefixLength = TopicFilterType.SHARE_GROUP_PREFIX.length(); + int topicFilterLength = topicFilter.length(); + for (int i = prefixLength; i < topicFilterLength; i++) { + char ch = topicFilter.charAt(i); + if ('/' == ch) { + return topicFilter.substring(prefixLength, i); + } + } + throw new MqttException("分享订阅topic过滤器: " + topicFilter + " 不符合 $share//xxx"); + } + + private static int findShareTopicIndex(String topicFilter) { + int prefixLength = TopicFilterType.SHARE_GROUP_PREFIX.length(); + int topicFilterLength = topicFilter.length(); + for (int i = prefixLength; i < topicFilterLength; i++) { + char ch = topicFilter.charAt(i); + if ('/' == ch) { + return i + 1; + } + } + throw new MqttException("分享订阅主题过滤器: " + topicFilter + " 不符合 $share//xxx"); + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/exception/MqttException.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/exception/MqttException.java new file mode 100644 index 00000000..53c66c24 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/exception/MqttException.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.exception; + +import top.continew.starter.core.exception.BaseException; + +import java.io.Serial; + +/** + * mqtt异常 + * + * @author echo + * @since 2.15.0 + */ +public class MqttException extends BaseException { + + @Serial + private static final long serialVersionUID = 1L; + + public MqttException() { + } + + public MqttException(String message) { + super(message); + } + + public MqttException(Throwable cause) { + super(cause); + } + + public MqttException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/handler/MqttMessageInboundHandler.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/handler/MqttMessageInboundHandler.java new file mode 100644 index 00000000..6372d6fa --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/handler/MqttMessageInboundHandler.java @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.handler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.core.env.Environment; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; +import top.continew.starter.messaging.mqtt.annotation.MqttListener; +import top.continew.starter.messaging.mqtt.constant.MqttConstant; +import top.continew.starter.messaging.mqtt.enums.TopicFilterType; +import top.continew.starter.messaging.mqtt.exception.MqttException; +import top.continew.starter.messaging.mqtt.model.MqttMessage; +import top.continew.starter.messaging.mqtt.msg.MqttMessageConsumer; +import top.continew.starter.messaging.mqtt.strategy.MqttOptions; +import top.continew.starter.messaging.mqtt.util.TopicUtils; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 消息调度器 路由分发中心 + * + * @author echo + * @since 2.15.0 + */ +public class MqttMessageInboundHandler implements MessageHandler, InitializingBean, ApplicationContextAware { + + private static final Logger log = LoggerFactory.getLogger(MqttMessageInboundHandler.class); + + // 精确匹配的topic -> 监听器映射(用于@MqttListener注解的监听器) + private final Map annotatedListenerMap = new ConcurrentHashMap<>(); + + // 所有实现了MqttMessageListener的Bean列表(用于动态订阅) + private final List allListeners; + + private final MqttOptions mqttOptions; + + private final Environment environment; + + @Nullable + private ApplicationContext applicationContext; + + public MqttMessageInboundHandler(List mqttMessageConsumerList, + MqttOptions mqttOptions, + Environment environment) { + this.allListeners = mqttMessageConsumerList; + this.mqttOptions = mqttOptions; + this.environment = environment; + } + + @Override + @ServiceActivator(inputChannel = MqttConstant.MQTT_INPUT_CHANNEL_NAME) + public void handleMessage(Message message) throws MessagingException { + MqttMessage mqttMessage = MqttMessage.of(message); + String topic = mqttMessage.getTopic(); + // 1. 优先处理通过@MqttListener注解订阅的精确匹配 + boolean handled = handleAnnotatedListeners(topic, mqttMessage); + // 2. 如果没有精确匹配的注解监听器,则广播给所有监听器 + if (!handled) { + broadcastToAllListeners(mqttMessage); + } + } + + /** + * 处理通过@MqttListener注解订阅的监听器 + * + * @return true if message was handled by annotated listener + */ + private boolean handleAnnotatedListeners(String topic, MqttMessage mqttMessage) { + boolean handled = false; + + for (Map.Entry entry : annotatedListenerMap.entrySet()) { + String topicFilter = entry.getKey(); + try { + if (isTopicMatch(topicFilter, topic)) { + entry.getValue().onMessage(mqttMessage); + handled = true; + } + } catch (Exception e) { + log.error("注解监听器处理消息时发生错误: {}", entry.getValue().getClass().getSimpleName(), e); + } + } + + return handled; + } + + /** + * 广播消息给所有监听器(用于动态订阅的场景) + */ + private void broadcastToAllListeners(MqttMessage mqttMessage) { + // 获取当前订阅的所有topics + List subscribedTopics = mqttOptions.listTopics(); + + // 检查消息的topic是否在订阅列表中 + boolean isSubscribed = subscribedTopics.stream() + .anyMatch(subscribedTopic -> isTopicMatch(subscribedTopic, mqttMessage.getTopic())); + + if (!isSubscribed) { + return; + } + + // 广播给所有没有@MqttListener注解的监听器 + for (MqttMessageConsumer listener : allListeners) { + // 跳过已经通过注解处理的监听器 + if (isAnnotatedListener(listener)) { + continue; + } + try { + listener.onMessage(mqttMessage); + } catch (Exception e) { + log.error("监听器处理消息时发生错误: {}", listener.getClass().getSimpleName(), e); + } + } + } + + /** + * 检查是否是注解监听器 + */ + private boolean isAnnotatedListener(MqttMessageConsumer listener) { + return annotatedListenerMap.containsValue(listener); + } + + /** + * 判断topic是否匹配 + */ + private boolean isTopicMatch(String topicFilter, String actualTopic) { + // 精确匹配 + if (topicFilter.equals(actualTopic)) { + return true; + } + + // 通配符匹配 + if (TopicUtils.isTopicFilter(topicFilter) && TopicUtils.match(topicFilter, actualTopic)) { + return true; + } + + // 共享订阅匹配 + return TopicFilterType.SHARE.equals(TopicFilterType.getType(topicFilter)) && TopicFilterType.SHARE + .match(topicFilter, actualTopic); + } + + @Override + public void afterPropertiesSet() { + Assert.notNull(applicationContext, "applicationContext不能为null"); + if (CollectionUtils.isEmpty(allListeners)) { + return; + } + // 处理带有@MqttListener注解的监听器 + for (MqttMessageConsumer mqttMessageConsumer : allListeners) { + Class clazz = ClassUtils.getUserClass(mqttMessageConsumer); + MqttListener mqttListener = AnnotationUtils.findAnnotation(clazz, MqttListener.class); + + if (Objects.nonNull(mqttListener)) { + String topic = resolvePlaceholder(mqttListener.topic()); + int qos = resolveQos(mqttListener.qos()); + mqttOptions.addTopic(topic, qos); + annotatedListenerMap.put(topic, mqttMessageConsumer); + } else { + log.info("发现无注解监听器: {},将接收所有动态订阅的消息", clazz.getSimpleName()); + } + } + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + /** + * 解析占位符,支持 ${} 和直接字符串 + */ + private String resolvePlaceholder(String value) { + if (StringUtils.hasText(value)) { + return environment.resolvePlaceholders(value); + } + return value; + } + + /** + * 解析 QoS 配置,支持占位符和直接数字 + * + * @param qosValue QoS 字符串值或占位符 + * @return QoS 等级 (0, 1, 或 2) + * @throws MqttException 如果 QoS 值无效 + */ + private int resolveQos(String qosValue) { + try { + String resolved = resolvePlaceholder(qosValue); + int qos = Integer.parseInt(resolved); + + // ✅ 验证 QoS 值是否合法 + if (qos < 0 || qos > 2) { + throw new MqttException("QoS 值必须在 0-2 之间, 当前值: " + qos); + } + + return qos; + } catch (NumberFormatException e) { + throw new MqttException("无法解析 QoS 值: " + qosValue + ", 必须是 0, 1 或 2", e); + } + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/handler/MqttShutdownHandler.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/handler/MqttShutdownHandler.java new file mode 100644 index 00000000..02a656e3 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/handler/MqttShutdownHandler.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.handler; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; + +/** + * mqtt关闭处理程序 + * + * @author echo + * @since 2.15.0 + */ +@SuppressWarnings("ClassCanBeRecord") +public class MqttShutdownHandler implements DisposableBean, ApplicationListener { + + private final MqttPahoMessageDrivenChannelAdapter inboundAdapter; + private final MqttPahoMessageHandler outboundHandler; + + public MqttShutdownHandler(MqttPahoMessageDrivenChannelAdapter inboundAdapter, + MqttPahoMessageHandler outboundHandler) { + this.inboundAdapter = inboundAdapter; + this.outboundHandler = outboundHandler; + } + + @Override + public void onApplicationEvent(ContextClosedEvent event) { + stopMqttConnections(); + } + + @Override + public void destroy() throws Exception { + stopMqttConnections(); + } + + private void stopMqttConnections() { + try { + if (inboundAdapter != null && inboundAdapter.isRunning()) { + inboundAdapter.stop(); + } + + if (outboundHandler != null) { + outboundHandler.stop(); + } + // 给一点时间让连接正常关闭 + Thread.sleep(500); + } catch (Exception ignored) { + } + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/model/MqttMessage.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/model/MqttMessage.java new file mode 100644 index 00000000..54dc1d51 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/model/MqttMessage.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.model; + +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import top.continew.starter.messaging.mqtt.enums.MqttQoS; + +import java.io.Serializable; +import java.util.Objects; + +/** + * mqtt消息实体 + * + * @author echo + * @since 2.15.0 + */ +public class MqttMessage implements Serializable { + + /** + * 消息头信息,包含 MQTT 消息的元数据,如 topic、QOS、retained 等。 + */ + private MessageHeaders messageHeaders; + + /** + * 消息体(负载),可以是字符串、字节数组或其他对象类型。 + */ + private Object payload; + + /** + * MQTT 主题,用于标识消息的发布/订阅通道。 + */ + private String topic; + + /** + * 消息服务质量等级(QOS): + * 0 - 最多一次,消息可能会丢失; + * 1 - 至少一次,消息可能重复; + * 2 - 只有一次,确保消息不重复也不丢失。 + */ + private Integer qos; + + /** + * 是否保留该消息(Retained): + * true 表示该消息会保留在 MQTT 服务器上,供新订阅者立即获取; + * false 表示仅当前订阅者接收到该消息。 + */ + private Boolean retained; + + public MqttMessage(Message message) { + this.messageHeaders = message.getHeaders(); + this.topic = (String)Objects.requireNonNull(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC)); + this.qos = (Integer)Objects.requireNonNull(message.getHeaders().get(MqttHeaders.RECEIVED_QOS)); + this.retained = (Boolean)Objects.requireNonNull(message.getHeaders().get(MqttHeaders.RECEIVED_RETAINED)); + this.payload = message.getPayload(); + } + + public MqttMessage(Object payload, String topic) { + this.payload = payload; + this.topic = topic; + this.qos = MqttQoS.AT_MOST_ONCE.value(); + this.retained = false; + } + + public MqttMessage(Object payload, String topic, Integer qos) { + this.payload = payload; + this.topic = topic; + this.qos = qos; + this.retained = false; + } + + public MqttMessage(Object payload, String topic, Integer qos, Boolean retained) { + this.payload = payload; + this.topic = topic; + this.qos = qos; + this.retained = retained; + } + + public static MqttMessage of(Message message) { + return new MqttMessage(message); + } + + public static MqttMessage of(Object payload, String topic, Integer qos) { + return new MqttMessage(payload, topic, qos); + } + + public MessageHeaders getMessageHeaders() { + return messageHeaders; + } + + public void setMessageHeaders(MessageHeaders messageHeaders) { + this.messageHeaders = messageHeaders; + } + + public Object getPayload() { + return payload; + } + + public void setPayload(Object payload) { + this.payload = payload; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Integer getQos() { + return qos; + } + + public void setQos(Integer qos) { + this.qos = qos; + } + + public Boolean getRetained() { + return retained; + } + + public void setRetained(Boolean retained) { + this.retained = retained; + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/msg/MqttMessageConsumer.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/msg/MqttMessageConsumer.java new file mode 100644 index 00000000..50bc7837 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/msg/MqttMessageConsumer.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.msg; + +import top.continew.starter.messaging.mqtt.handler.MqttMessageInboundHandler; +import top.continew.starter.messaging.mqtt.model.MqttMessage; + +/** + * 消息监听 - 消费者 + * + * @author echo + * @since 2.15.0 + */ +public interface MqttMessageConsumer { + + /** + * 消息订阅 + * + * @param message {@link MqttMessage} + * @see MqttMessageInboundHandler + */ + void onMessage(MqttMessage message); +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/msg/MqttMessageProducer.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/msg/MqttMessageProducer.java new file mode 100644 index 00000000..24e3c2c0 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/msg/MqttMessageProducer.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.msg; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import top.continew.starter.messaging.mqtt.constant.MqttConstant; + +/** + * 消息发送 - 生产者 + * + * @author echo + * @since 2.15.0 + **/ +@MessagingGateway(defaultRequestChannel = MqttConstant.MQTT_OUT_BOUND_CHANNEL_NAME) +public interface MqttMessageProducer { + + /** + * 消息发送 - 默认topic + * + * @param payload 消息体 + */ + void sendToMqtt(String payload); + + /** + * 指定topic进行消息发送 + * + * @param topic topic + * @param payload 消息体 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Payload String payload); + + /** + * 指定topic进行消息发送 + * + * @param topic topic + * @param qos qos + * @param payload 消息体 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, + @Header(MqttHeaders.QOS) int qos, + @Header(MqttHeaders.RETAINED) boolean retained, + @Payload String payload); + + /** + * 指定topic进行消息发送 + * + * @param topic topic + * @param qos qos + * @param payload 消息体 + */ + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, + @Header(MqttHeaders.QOS) int qos, + @Header(MqttHeaders.RETAINED) boolean retained, + @Payload byte[] payload); +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/strategy/MqttOptions.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/strategy/MqttOptions.java new file mode 100644 index 00000000..61c43371 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/strategy/MqttOptions.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.strategy; + +import java.util.Collection; +import java.util.List; + +/** + * MQTT 客户端操作接口 + *

+ * 提供 topic 管理与消息发布能力。 + * 实现类可基于不同 MQTT 客户端或通信方式进行扩展。 + *

+ * + * @author echo + * @since 2.15.0 + */ +public interface MqttOptions { + + /** + * 添加topic + * + * @param topic topic + */ + void addTopic(String topic); + + /** + * 添加topic + * + * @param topic topic + * @param qos qos + */ + void addTopic(String topic, int qos); + + /** + * 删除topic + * + * @param topic topic + */ + void removeTopic(String topic); + + /** + * 查询订阅的所有topic + * + * @return topic list + */ + List listTopics(); + + /** + * 批量添加订阅主题(默认 QoS) + * + * @param topics 订阅的主题列表 + */ + void addTopics(String... topics); + + /** + * 批量添加订阅主题,指定统一 QoS 等级 + * + * @param qos 消息服务质量等级(0/1/2) + * @param topics 订阅的主题列表 + */ + void addTopics(int qos, String... topics); + + /** + * 批量添加订阅主题(默认 QoS) + * + * @param topics 订阅的主题集合(List 形式) + */ + void addTopics(List topics); + + /** + * 批量添加订阅主题,指定统一 QoS 等级 + * + * @param topics 订阅的主题集合(List 形式) + * @param qos 消息服务质量等级(0/1/2) + */ + void addTopics(List topics, int qos); + + /** + * 批量取消订阅主题 + * + * @param topics 要取消订阅的主题列表 + */ + void removeTopics(String... topics); + + /** + * 批量取消订阅主题 + * + * @param topics 要取消订阅的主题集合(List 形式) + */ + void removeTopics(List topics); + + /** + * 批量取消订阅主题 + * + * @param topics 要取消订阅的主题集合(Collection 形式) + */ + void removeTopics(Collection topics); + + /** + * 判断是否已订阅指定主题 + * + * @param topic 主题名称 + * @return 如果已订阅该主题,返回 true;否则返回 false + */ + boolean containsTopic(String topic); + + /** + * 清空所有已订阅的主题 + */ + void clearAllTopics(); + + /** + * 获取当前已订阅的主题数量 + * + * @return 已订阅的主题数量 + */ + int getTopicCount(); +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/strategy/MqttTemplate.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/strategy/MqttTemplate.java new file mode 100644 index 00000000..972097ba --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/strategy/MqttTemplate.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.strategy; + +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import top.continew.starter.messaging.mqtt.enums.MqttQoS; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +/** + * MQTT 客户端发布+订阅封装器 + * + * @author echo + * @since 2.15.0 + **/ +@SuppressWarnings("ClassCanBeRecord") +public class MqttTemplate implements MqttOptions { + + private final MqttPahoMessageDrivenChannelAdapter adapter; + + public MqttTemplate(MqttPahoMessageDrivenChannelAdapter adapter) { + this.adapter = adapter; + } + + @Override + public void addTopic(String topic) { + this.addTopic(topic, MqttQoS.AT_MOST_ONCE.value()); + } + + @Override + public void addTopic(String topic, int qos) { + Set topics = Set.of(adapter.getTopic()); + if (topics.contains(topic)) { + return; + } + adapter.addTopic(topic, qos); + } + + @Override + public void removeTopic(String topic) { + Set topics = Set.of(adapter.getTopic()); + if (!topics.contains(topic)) { + return; + } + adapter.removeTopic(topic); + } + + @Override + public List listTopics() { + return List.of(adapter.getTopic()); + } + + @Override + public void addTopics(String... topics) { + addTopics(MqttQoS.AT_MOST_ONCE.value(), topics); + } + + @Override + public void addTopics(int qos, String... topics) { + if (topics == null) { + return; + } + for (String topic : topics) { + addTopic(topic, qos); + } + } + + @Override + public void addTopics(List topics) { + addTopics(topics, MqttQoS.AT_MOST_ONCE.value()); + } + + @Override + public void addTopics(List topics, int qos) { + if (topics == null || topics.isEmpty()) { + return; + } + topics.forEach(topic -> addTopic(topic, qos)); + } + + @Override + public void removeTopics(String... topics) { + if (topics == null) { + return; + } + for (String topic : topics) { + removeTopic(topic); + } + } + + @Override + public void removeTopics(List topics) { + if (topics == null || topics.isEmpty()) { + return; + } + topics.forEach(this::removeTopic); + } + + @Override + public void removeTopics(Collection topics) { + if (topics == null || topics.isEmpty()) { + return; + } + topics.forEach(this::removeTopic); + } + + @Override + public boolean containsTopic(String topic) { + if (topic == null || topic.trim().isEmpty()) { + return false; + } + Set topics = Set.of(adapter.getTopic()); + return topics.contains(topic); + } + + @Override + public void clearAllTopics() { + List currentTopics = listTopics(); + currentTopics.forEach(this::removeTopic); + } + + @Override + public int getTopicCount() { + return listTopics().size(); + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/util/TopicUtils.java b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/util/TopicUtils.java new file mode 100644 index 00000000..26587ef6 --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/java/top/continew/starter/messaging/mqtt/util/TopicUtils.java @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2022-present Charles7c Authors. All Rights Reserved. + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package top.continew.starter.messaging.mqtt.util; + +import top.continew.starter.messaging.mqtt.exception.MqttException; + +import java.util.List; + +/** + * 消息主题工具类 + * + * @author echo + * @since 2.15.0 + */ +public class TopicUtils { + + public static final char TOPIC_WILDCARDS_ONE = '+'; + + public static final char TOPIC_WILDCARDS_MORE = '#'; + + public TopicUtils() { + } + + /** + * 校验 topicFilter + * + * @param topicFilterList topicFilter 集合 + */ + public static void validateTopicFilter(List topicFilterList) { + for (String topicFilter : topicFilterList) { + validateTopicFilter(topicFilter); + } + } + + /** + * 校验 topicFilter + * + * @param topicFilter topicFilter + */ + public static void validateTopicFilter(String topicFilter) throws MqttException { + if (topicFilter == null || topicFilter.isEmpty()) { + throw new MqttException("TopicFilter is blank:" + topicFilter); + } + char[] topicFilterChars = topicFilter.toCharArray(); + int topicFilterLength = topicFilterChars.length; + int topicFilterIdxEnd = topicFilterLength - 1; + char ch; + for (int i = 0; i < topicFilterLength; i++) { + ch = topicFilterChars[i]; + if (Character.isWhitespace(ch)) { + throw new MqttException("Mqtt subscribe topicFilter has white space:" + topicFilter); + } else if (ch == TOPIC_WILDCARDS_MORE) { + // 校验: # 通配符只能在最后一位 + if (i < topicFilterIdxEnd) { + throw new MqttException("Mqtt subscribe topicFilter illegal:" + topicFilter); + } + } else if (ch == TOPIC_WILDCARDS_ONE) { + // 校验: 单独 + 是允许的,判断 + 号前一位是否为 /,如果有后一位也必须为 / + if ((i > 0 && topicFilterChars[i - 1] != '/') || (i < topicFilterIdxEnd && topicFilterChars[i + 1] != '/')) { + throw new MqttException("Mqtt subscribe topicFilter illegal:" + topicFilter); + } + } + } + } + + /** + * 判断是否 topic filter + * + * @param topicFilter topicFilter + * @return 是否 topic filter + */ + public static boolean isTopicFilter(String topicFilter) { + char[] topicFilterChars = topicFilter.toCharArray(); + for (char ch : topicFilterChars) { + if (TOPIC_WILDCARDS_ONE == ch || TOPIC_WILDCARDS_MORE == ch) { + return true; + } + } + return false; + } + + /** + * 校验 topicName + * + * @param topicName topicName + */ + public static void validateTopicName(String topicName) throws MqttException { + if (topicName.isEmpty()) { + throw new MqttException("Topic is blank:" + topicName); + } + if (isTopicFilter(topicName)) { + throw new MqttException("Topic has wildcards char [+] or [#], topicName:" + topicName); + } + } + + /** + * 判断 topicFilter topicName 是否匹配 + * + * @param topicFilter topicFilter + * @param topicName topicName + * @return 是否匹配 + */ + public static boolean match(String topicFilter, String topicName) { + char[] topicFilterChars = topicFilter.toCharArray(); + char[] topicNameChars = topicName.toCharArray(); + int topicFilterLength = topicFilterChars.length; + int topicNameLength = topicNameChars.length; + int topicFilterIdxEnd = topicFilterLength - 1; + int topicNameIdxEnd = topicNameLength - 1; + char ch; + // 是否进入 + 号层级通配符 + boolean inLayerWildcard = false; + int wildcardCharLen = 0; + topicFilterLoop: + for (int i = 0; i < topicFilterLength; i++) { + ch = topicFilterChars[i]; + if (ch == TOPIC_WILDCARDS_MORE) { + // 校验: # 通配符只能在最后一位 + if (i < topicFilterIdxEnd) { + throw new MqttException("Mqtt subscribe topicFilter illegal:" + topicFilter); + } + return true; + } else if (ch == TOPIC_WILDCARDS_ONE) { + // 校验: 单独 + 是允许的,判断 + 号前一位是否为 /,如果有后一位也必须为 / + if ((i > 0 && topicFilterChars[i - 1] != '/') || (i < topicFilterIdxEnd && topicFilterChars[i + 1] != '/')) { + throw new MqttException("Mqtt subscribe topicFilter illegal:" + topicFilter); + } + // 如果 + 是最后一位,判断 topicName 中是否还存在层级 / + // topicName index + int topicNameIdx = i + wildcardCharLen; + if (i == topicFilterIdxEnd && topicNameLength > topicNameIdx) { + for (int j = topicNameIdx; j < topicNameLength; j++) { + if (topicNameChars[j] == '/') { + return false; + } + } + return true; + } + inLayerWildcard = true; + } else if (ch == '/') { + if (inLayerWildcard) { + inLayerWildcard = false; + } + // 预读下一位,如果是 #,并且 topicName 位数已经不足 + int next = i + 1; + if ((topicFilterLength > next) && topicFilterChars[next] == '#' && topicNameLength < next) { + return true; + } + } + // topicName 长度不够了 + if (topicNameIdxEnd < i) { + return false; + } + // 进入通配符 + if (inLayerWildcard) { + for (int j = i + wildcardCharLen; j < topicNameLength; j++) { + if (topicNameChars[j] == '/') { + wildcardCharLen--; + continue topicFilterLoop; + } else { + wildcardCharLen++; + } + } + } + // topicName index + int topicNameIdx = i + wildcardCharLen; + // topic 已经完成,topicName 还有数据 + if (topicNameIdx > topicNameIdxEnd) { + return false; + } + if (ch != topicNameChars[topicNameIdx]) { + return false; + } + } + // 判断 topicName 是否还有数据 + return topicFilterLength + wildcardCharLen + 1 > topicNameLength; + } + + /** + * 获取处理完成之后的 topic + * + * @param topicTemplate topic 模板 + * @return 获取处理完成之后的 topic + */ + public static String getTopicFilter(String topicTemplate) { + // 替换 ${name} 为 + + StringBuilder sb = new StringBuilder(topicTemplate.length()); + int cursor = 0; + for (int start, end; (start = topicTemplate.indexOf("${", cursor)) != -1 && (end = topicTemplate + .indexOf('}', start)) != -1;) { + sb.append(topicTemplate, cursor, start); + sb.append('+'); + cursor = end + 1; + } + sb.append(topicTemplate.substring(cursor)); + return sb.toString(); + } +} diff --git a/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..b9e17d1c --- /dev/null +++ b/continew-starter-messaging/continew-starter-messaging-mqtt/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +top.continew.starter.messaging.mqtt.autoconfigure.MqttAutoConfiguration \ No newline at end of file diff --git a/continew-starter-messaging/pom.xml b/continew-starter-messaging/pom.xml index 44e356be..777906fe 100644 --- a/continew-starter-messaging/pom.xml +++ b/continew-starter-messaging/pom.xml @@ -18,6 +18,7 @@ continew-starter-messaging-mail continew-starter-messaging-websocket + continew-starter-messaging-mqtt