mirror of
https://github.com/continew-org/continew-admin.git
synced 2025-09-10 20:57:14 +08:00
feat: 新增 WebSocket 消息通知 (#67)
This commit is contained in:
@@ -135,5 +135,10 @@
|
||||
<groupId>top.continew</groupId>
|
||||
<artifactId>continew-starter-json-jackson</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package top.continew.admin.common.config.websocket;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
|
||||
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
|
||||
import top.continew.admin.common.handler.MyWebSocketHandler;
|
||||
|
||||
/**
|
||||
* WebSocketConfig配置
|
||||
*
|
||||
* @author WeiRan
|
||||
* @since 2024.03.13 16:45
|
||||
*/
|
||||
@Configuration
|
||||
@EnableWebSocket
|
||||
public class WebSocketConfig implements WebSocketConfigurer {
|
||||
|
||||
@Resource
|
||||
private WebsocketInterceptor customWebsocketInterceptor;
|
||||
|
||||
@Resource
|
||||
private MyWebSocketHandler myWebSocketHandler;
|
||||
|
||||
/**
|
||||
* 注册WebSocket处理程序并设置必要的配置。
|
||||
*
|
||||
* @param registry 用于注册处理程序的WebSocketHandlerRegistry
|
||||
*/
|
||||
@Override
|
||||
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
|
||||
registry
|
||||
// 设置处理器处理/custom/**
|
||||
.addHandler(myWebSocketHandler, "/ws")
|
||||
// 允许跨越
|
||||
.setAllowedOrigins("*")
|
||||
// 设置监听器
|
||||
.addInterceptors(customWebsocketInterceptor);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,119 @@
|
||||
/*
|
||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package top.continew.admin.common.config.websocket;
|
||||
|
||||
/**
|
||||
* Created by WeiRan on 2024.03.13 16:43
|
||||
*/
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.servlet.JakartaServletUtil;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.server.ServerHttpRequest;
|
||||
import org.springframework.http.server.ServerHttpResponse;
|
||||
import org.springframework.http.server.ServletServerHttpRequest;
|
||||
import org.springframework.http.server.ServletServerHttpResponse;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.WebSocketHandler;
|
||||
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
|
||||
import top.continew.admin.common.model.dto.LoginUser;
|
||||
import top.continew.admin.common.util.helper.LoginHelper;
|
||||
import top.continew.starter.web.util.ServletUtils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 用来处理webscocket拦截器
|
||||
*
|
||||
* @author WeiRan
|
||||
* @since 2024.03.13 16:45
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class WebsocketInterceptor extends HttpSessionHandshakeInterceptor {
|
||||
|
||||
/**
|
||||
* 在建立 WebSocket 连接之前处理握手过程。
|
||||
*
|
||||
* @param request HTTP 请求对象
|
||||
* @param response HTTP 响应对象
|
||||
* @param wsHandler WebSocket 处理程序
|
||||
* @param attributes 用于存储 WebSocket 会话的自定义属性的映射
|
||||
* @return 如果握手成功则返回 true,否则返回 false
|
||||
* @throws Exception 如果在握手过程中发生错误
|
||||
*/
|
||||
@Override
|
||||
public boolean beforeHandshake(ServerHttpRequest request,
|
||||
ServerHttpResponse response,
|
||||
WebSocketHandler wsHandler,
|
||||
Map<String, Object> attributes) throws Exception {
|
||||
ServletServerHttpRequest req = (ServletServerHttpRequest)request;
|
||||
ServletServerHttpResponse res = (ServletServerHttpResponse)response;
|
||||
|
||||
HttpServletRequest httpServletRequest = ServletUtils.getRequest();
|
||||
String ip = JakartaServletUtil.getClientIP(httpServletRequest);
|
||||
String token = req.getServletRequest().getParameter("token");
|
||||
log.info("开始建立连接....token:{}", token);
|
||||
log.info("attributes:{}", attributes);
|
||||
if (StrUtil.isBlank(token)) {
|
||||
res.setStatusCode(HttpStatus.UNAUTHORIZED);
|
||||
res.getServletResponse().setContentType("application/json");
|
||||
String errorMessage = "{\"error\": \"Authentication failed. Please provide valid credentials.\"}";
|
||||
res.getBody().write(errorMessage.getBytes());
|
||||
return false;
|
||||
}
|
||||
|
||||
// 鉴权: 如果返回 false 则表示未通过
|
||||
// response.setStatusCode(HttpStatus.UNAUTHORIZED);
|
||||
//返回 false;
|
||||
LoginUser loginUser = LoginHelper.getLoginUser(token);
|
||||
if (loginUser == null) {
|
||||
res.setStatusCode(HttpStatus.UNAUTHORIZED);
|
||||
res.getServletResponse().setContentType("application/json");
|
||||
String errorMessage = "{\"error\": \"Authentication failed. Please provide valid credentials.\"}";
|
||||
res.getBody().write(errorMessage.getBytes());
|
||||
res.close();
|
||||
return false;
|
||||
}
|
||||
attributes.put("userId", String.valueOf(loginUser.getId()));
|
||||
attributes.put("ip", ip);
|
||||
super.setCreateSession(true);
|
||||
return super.beforeHandshake(request, response, wsHandler, attributes);
|
||||
}
|
||||
|
||||
/**
|
||||
* WebSocket 握手成功完成后调用此方法。
|
||||
* 它记录一条消息指示连接已建立,然后调用
|
||||
* 方法的超类实现。
|
||||
*
|
||||
* @param request 表示传入 HTTP 请求的 ServerHttpRequest 对象
|
||||
* @param response 表示传出 HTTP 响应的 ServerHttpResponse 对象
|
||||
* @param wsHandler 将处理 WebSocket 会话的 WebSocketHandler 对象
|
||||
* @param exception 在握手过程中发生的异常,如果没有异常则为 null
|
||||
*/
|
||||
@Override
|
||||
public void afterHandshake(ServerHttpRequest request,
|
||||
ServerHttpResponse response,
|
||||
WebSocketHandler wsHandler,
|
||||
Exception exception) {
|
||||
log.info("连接成功....");
|
||||
//其他业务代码
|
||||
super.afterHandshake(request, response, wsHandler, exception);
|
||||
}
|
||||
}
|
@@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package top.continew.admin.common.handler;
|
||||
|
||||
/**
|
||||
* Created by WeiRan on 2024.03.13 16:41
|
||||
*/
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.springframework.web.socket.TextMessage;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
||||
import top.continew.admin.common.util.WsUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author zhong
|
||||
* webscoket 处理器
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class MyWebSocketHandler extends TextWebSocketHandler {
|
||||
|
||||
/**
|
||||
* 收到客户端消息时触发的回调
|
||||
*
|
||||
* @param session 连接对象
|
||||
* @param message 消息体
|
||||
*/
|
||||
@Override
|
||||
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
|
||||
log.info("接受到会话【{}】的消息:{}", session.getId(), message.getPayload());
|
||||
String jsonPayload = message.getPayload();
|
||||
if (StrUtil.isBlank(jsonPayload)) {
|
||||
log.error("接收到空消息");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
//业务逻辑处理
|
||||
// WsUtils.send(session, "我收到了你的消息");
|
||||
} catch (Exception e) {
|
||||
log.error("WebSocket消息解析失败:{}", e.getMessage(), e);
|
||||
WsUtils.close(session, "消息解析失败:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 建立连接后触发的回调
|
||||
*
|
||||
* @param session 连接对象
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||
String userId = String.valueOf(session.getAttributes().get("userId"));
|
||||
// 将新连接添加
|
||||
WsUtils.bindUser(userId, session);
|
||||
//在线数加1
|
||||
WsUtils.onlineCount.incrementAndGet();
|
||||
log.info("与用户【{}】建立了连接 当前在线人数【{}】", userId, WsUtils.onlineCount.get());
|
||||
log.info("attributes:{}", session.getAttributes());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 断开连接后触发的回调
|
||||
*
|
||||
* @param session 连接对象
|
||||
* @param status 状态
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
@Override
|
||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
|
||||
// 关闭连接
|
||||
WsUtils.close(session, "断开连接后触发的回调");
|
||||
log.info("用户【{}】断开连接,status:{},当前剩余在线人数【{}】", WsUtils.getUserId(session), status.getCode(), WsUtils.onlineCount
|
||||
.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* 传输消息出错时触发的回调
|
||||
*
|
||||
* @param session 连接对象
|
||||
* @param exception 异常
|
||||
* @throws Exception 异常
|
||||
*/
|
||||
@Override
|
||||
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
|
||||
log.info("用户【{}】发生错误,exception:{}", session.getId(), exception.getMessage());
|
||||
// 如果发送异常,则断开连接
|
||||
if (session.isOpen()) {
|
||||
WsUtils.close(session, "传输消息出错时触发的回调");
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package top.continew.admin.common.model.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@Data
|
||||
public class WsMsg implements Serializable {
|
||||
/**
|
||||
* 消息 ID
|
||||
*/
|
||||
public String msgId;
|
||||
|
||||
/**
|
||||
* 发送者 ID
|
||||
*/
|
||||
public String fromId;
|
||||
|
||||
/**
|
||||
* 发送人名称
|
||||
*/
|
||||
public String fromName;
|
||||
|
||||
/**
|
||||
* 接受者ID
|
||||
*/
|
||||
public String toId;
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
public int msgType;
|
||||
|
||||
/**
|
||||
* 发送消息时间戳
|
||||
*/
|
||||
public long sendTime;
|
||||
|
||||
/**
|
||||
* 消息内容
|
||||
*/
|
||||
public String content;
|
||||
|
||||
}
|
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package top.continew.admin.common.util;
|
||||
|
||||
import top.continew.admin.common.enums.MessageTypeEnum;
|
||||
import top.continew.admin.common.model.dto.WsMsg;
|
||||
import top.continew.admin.common.util.helper.LoginHelper;
|
||||
|
||||
public class NoticeMsgUtils {
|
||||
public static WsMsg conversion(String massage, String id) {
|
||||
WsMsg msg = new WsMsg();
|
||||
msg.setMsgId(id);
|
||||
msg.setFromId(LoginHelper.getUserId().toString());
|
||||
msg.setFromName(LoginHelper.getUsername());
|
||||
msg.setContent(massage);
|
||||
msg.setMsgType(MessageTypeEnum.SYSTEM.getValue());
|
||||
msg.setSendTime(System.currentTimeMillis());
|
||||
return msg;
|
||||
}
|
||||
}
|
@@ -0,0 +1,253 @@
|
||||
/*
|
||||
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package top.continew.admin.common.util;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.springframework.web.socket.TextMessage;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import top.continew.admin.common.model.dto.WsMsg;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* WebSocket工具类
|
||||
*
|
||||
* @author WeiRan
|
||||
* @since 2024.03.13 16:45
|
||||
*/
|
||||
@Slf4j
|
||||
public class WsUtils {
|
||||
/**
|
||||
* 静态变量,用来记录当前用户的session,线程安全的类。
|
||||
*/
|
||||
public static ConcurrentHashMap<String, WebSocketSession> webSocketSessionMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 静态变量,用来记录当前在线连接数,线程安全的类。
|
||||
*/
|
||||
public static AtomicInteger onlineCount = new AtomicInteger(0);
|
||||
|
||||
/**
|
||||
* 绑定用户连接
|
||||
*
|
||||
* @param userId 用户ID
|
||||
* @param session WebSocketSession信息
|
||||
*/
|
||||
public static void bindUser(String userId, WebSocketSession session) {
|
||||
webSocketSessionMap.put(userId, session);
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除连接
|
||||
*
|
||||
* @param session WebSocketSession信息
|
||||
*/
|
||||
public static void remove(WebSocketSession session) {
|
||||
//在线数减1
|
||||
onlineCount.decrementAndGet();
|
||||
webSocketSessionMap.remove(getUserId(session));
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭 WebSocket 连接的方法。
|
||||
*
|
||||
* @param webSocketSession WebSocketSession信息,表示当前的 WebSocket 连接。
|
||||
* @param msg 发送消息内容,用于记录关闭连接的原因或相关信息。
|
||||
* @throws IOException 当关闭 WebSocket 连接时可能抛出的 IO 异常。
|
||||
*/
|
||||
public static void close(WebSocketSession webSocketSession, String msg) throws IOException {
|
||||
String userId = getUserId(webSocketSession);
|
||||
if (!webSocketSession.isOpen()) {
|
||||
WsUtils.remove(webSocketSession);
|
||||
log.warn("连接对象【{}】已关闭:{}", userId, msg);
|
||||
} else {
|
||||
webSocketSession.close(CloseStatus.SERVER_ERROR);
|
||||
WsUtils.remove(webSocketSession);
|
||||
log.error("服务端主动关闭连接:用户{}信息:{}", userId, msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过用户ID获取对应的 WebSocketSession。
|
||||
*
|
||||
* @param userId 用户ID,用于查找对应的 WebSocketSession。
|
||||
* @return WebSocketSession 如果存在与给定用户ID对应的 WebSocketSession,则返回该对象;如果不存在,则返回 null。
|
||||
*/
|
||||
public static WebSocketSession getWebSocketSession(String userId) {
|
||||
return webSocketSessionMap.get(userId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取用户的session
|
||||
*
|
||||
* @param session WebSocketSession对象,包含用户的会话信息
|
||||
* @return 用户的ID,类型为字符串
|
||||
*/
|
||||
public static String getUserId(WebSocketSession session) {
|
||||
return (String)session.getAttributes().get("userId");
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取用户的IP地址
|
||||
*
|
||||
* @param session WebSocketSession对象,包含用户的会话信息
|
||||
* @return 用户的IP地址,类型为字符串
|
||||
*/
|
||||
public static String getIp(WebSocketSession session) {
|
||||
return (String)session.getAttributes().get("ip");
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断链接是否存在
|
||||
*
|
||||
* @param userId 用户ID,用于查找对应的WebSocket会话
|
||||
* @return 如果映射中存在该用户ID对应的链接,返回true;否则返回false
|
||||
*/
|
||||
public static boolean contains(String userId) {
|
||||
return WsUtils.webSocketSessionMap.containsKey(userId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断链接是否存在
|
||||
*
|
||||
* @param webSocketSession WebSocketSession对象,包含用户的会话信息
|
||||
* @return 如果映射中存在该用户ID对应的链接,返回true;否则返回false
|
||||
*/
|
||||
public static boolean contains(WebSocketSession webSocketSession) {
|
||||
return WsUtils.webSocketSessionMap.containsKey(getUserId(webSocketSession));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有用户连接的用户ID列表。
|
||||
*
|
||||
* @return List<String> 包含所有已连接用户的用户ID的列表。
|
||||
*/
|
||||
public static List<String> getUserList() {
|
||||
return Collections.list(webSocketSessionMap.keys());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息给指定用户。
|
||||
*
|
||||
* @param userId 用户ID,用于指定消息接收者。
|
||||
* @param message 消息内容,要发送给指定用户的消息。
|
||||
* @throws IOException 当发送消息时可能抛出的 IO 异常。
|
||||
*/
|
||||
public static void sendToUser(String userId, String message) {
|
||||
WebSocketSession webSocketSession = getWebSocketSession(userId);
|
||||
|
||||
if (webSocketSession == null || !webSocketSession.isOpen()) {
|
||||
log.warn("用户【{}】已关闭,无法送消息:{}", userId, message);
|
||||
} else {
|
||||
try {
|
||||
webSocketSession.sendMessage(new TextMessage(message));
|
||||
} catch (IOException e) {
|
||||
log.error("发送消息失败:{}", e.getMessage(), e);
|
||||
}
|
||||
log.info("sendMessage:向{}发送消息:{}", userId, message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息给指定用户。
|
||||
*
|
||||
* @param userId 目标用户的ID。
|
||||
* @param message 要发送的消息对象。
|
||||
* @throws IOException 如果发生IO异常。
|
||||
*/
|
||||
public static void sendToUser(String userId, WsMsg message) {
|
||||
WebSocketSession webSocketSession = getWebSocketSession(userId);
|
||||
if (webSocketSession == null || !webSocketSession.isOpen()) {
|
||||
log.warn("用户【{}】已关闭,无法送消息:{}", userId, JSONObject.toJSONString(message));
|
||||
} else {
|
||||
try {
|
||||
webSocketSession.sendMessage(new TextMessage(JSONObject.toJSONString(message)));
|
||||
} catch (IOException e) {
|
||||
log.error("发送消息失败:{}", e.getMessage(), e);
|
||||
}
|
||||
log.info("sendMessage:向{}发送消息:{}", userId, JSONObject.toJSONString(message));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息全部在线用户
|
||||
*
|
||||
* @param message 发送的消息内容
|
||||
* @throws IOException 如果发生IO异常。
|
||||
*/
|
||||
public static void sendToAll(String message) {
|
||||
getUserList().forEach(userId -> sendToUser(userId, message));
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息全部在线用户
|
||||
*
|
||||
* @param message 发送的消息内容
|
||||
* @throws IOException 如果发生IO异常。
|
||||
*/
|
||||
public static void sendToAll(WsMsg message) {
|
||||
getUserList().forEach(userId -> sendToUser(userId, JSONObject.toJSONString(message)));
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送通过webSocketSession发送消息
|
||||
*
|
||||
* @param webSocketSession 对象id
|
||||
* @param message 发送的消息内容
|
||||
* @throws IOException 如果发生IO异常。
|
||||
*/
|
||||
public static void send(WebSocketSession webSocketSession, String message) {
|
||||
|
||||
if (webSocketSession == null || !webSocketSession.isOpen()) {
|
||||
log.warn("连接对象【{}】已关闭,无法送消息:{}", webSocketSession.getId(), message);
|
||||
} else {
|
||||
try {
|
||||
webSocketSession.sendMessage(new TextMessage(message));
|
||||
} catch (IOException e) {
|
||||
log.error("发送消息失败:{}", e.getMessage(), e);
|
||||
}
|
||||
log.info("sendMessage:向{}发送消息:{}", webSocketSession.getId(), message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过 WebSocketSession 对象发送消息。
|
||||
*
|
||||
* @param webSocketSession 目标 WebSocketSession 对象。
|
||||
* @param wsMsg 要发送的WsMsg消息对象。
|
||||
* @throws IOException 如果发生IO异常。
|
||||
*/
|
||||
public static void send(WebSocketSession webSocketSession, WsMsg wsMsg) {
|
||||
if (webSocketSession == null || !webSocketSession.isOpen()) {
|
||||
log.warn("连接对象【{}】已关闭,无法送消息:{}", webSocketSession.getId(), JSONObject.toJSONString(wsMsg));
|
||||
} else {
|
||||
try {
|
||||
webSocketSession.sendMessage(new TextMessage(JSONObject.toJSONString(wsMsg)));
|
||||
} catch (IOException e) {
|
||||
log.error("发送消息失败:{}", e.getMessage(), e);
|
||||
}
|
||||
log.info("sendMessage:向{}发送消息:{}", webSocketSession.getId(), JSONObject.toJSONString(wsMsg));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -24,6 +24,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import top.continew.admin.common.util.helper.LoginHelper;
|
||||
import top.continew.admin.system.mapper.MessageMapper;
|
||||
import top.continew.admin.system.model.entity.MessageDO;
|
||||
import top.continew.admin.system.model.query.MessageQuery;
|
||||
@@ -66,6 +67,7 @@ public class MessageServiceImpl implements MessageService {
|
||||
public void add(MessageReq req, List<Long> userIdList) {
|
||||
CheckUtils.throwIf(() -> CollUtil.isEmpty(userIdList), "消息接收人不能为空");
|
||||
MessageDO message = BeanUtil.copyProperties(req, MessageDO.class);
|
||||
message.setCreateUser(LoginHelper.getUserId());
|
||||
baseMapper.insert(message);
|
||||
messageUserService.add(message.getId(), userIdList);
|
||||
}
|
||||
|
@@ -20,6 +20,7 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Service;
|
||||
import top.continew.admin.common.enums.MessageTypeEnum;
|
||||
import top.continew.admin.common.util.helper.LoginHelper;
|
||||
import top.continew.admin.system.mapper.MessageUserMapper;
|
||||
import top.continew.admin.system.model.entity.MessageUserDO;
|
||||
import top.continew.admin.system.model.resp.MessageTypeUnreadResp;
|
||||
@@ -84,6 +85,7 @@ public class MessageUserServiceImpl implements MessageUserService {
|
||||
.set(MessageUserDO::getIsRead, true)
|
||||
.set(MessageUserDO::getReadTime, LocalDateTime.now())
|
||||
.eq(MessageUserDO::getIsRead, false)
|
||||
.eq(MessageUserDO::getUserId, LoginHelper.getUserId())
|
||||
.in(CollUtil.isNotEmpty(ids), MessageUserDO::getMessageId, ids)
|
||||
.update();
|
||||
}
|
||||
|
@@ -16,6 +16,7 @@
|
||||
|
||||
package top.continew.admin.webapi.system;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.enums.ParameterIn;
|
||||
@@ -23,8 +24,12 @@ import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import top.continew.admin.common.enums.MessageTypeEnum;
|
||||
import top.continew.admin.common.util.NoticeMsgUtils;
|
||||
import top.continew.admin.common.util.WsUtils;
|
||||
import top.continew.admin.common.util.helper.LoginHelper;
|
||||
import top.continew.admin.system.model.query.MessageQuery;
|
||||
import top.continew.admin.system.model.req.MessageReq;
|
||||
import top.continew.admin.system.model.resp.MessageResp;
|
||||
import top.continew.admin.system.model.resp.MessageUnreadResp;
|
||||
import top.continew.admin.system.service.MessageService;
|
||||
@@ -34,6 +39,7 @@ import top.continew.starter.extension.crud.model.resp.PageResp;
|
||||
import top.continew.starter.web.model.R;
|
||||
import top.continew.starter.log.core.annotation.Log;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -81,4 +87,17 @@ public class MessageController {
|
||||
public R<MessageUnreadResp> countUnreadMessage(@RequestParam(required = false) Boolean detail) {
|
||||
return R.ok(messageUserService.countUnreadMessageByUserId(LoginHelper.getUserId(), detail));
|
||||
}
|
||||
|
||||
@GetMapping("/testSend")
|
||||
public R<Object> testSend(String msg) {
|
||||
List<Long> userIdList = new ArrayList<>();
|
||||
userIdList.add(LoginHelper.getUserId());
|
||||
MessageReq req = new MessageReq();
|
||||
req.setTitle(msg);
|
||||
req.setContent(msg);
|
||||
req.setType(MessageTypeEnum.SYSTEM);
|
||||
baseService.add(req, userIdList);
|
||||
WsUtils.sendToUser(LoginHelper.getUserId().toString(), NoticeMsgUtils.conversion(msg, IdUtil.fastSimpleUUID()));
|
||||
return R.ok();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user