基于Spring Websocket STOMP协议在线聊天核心功能的实现
背景
接了高中同学朋友的一个毕设单子,帮他修复问题(他Vue Router的路由守卫没有检查,一直无限重定向了),然后顺便再开发一个心理咨询项目的在线聊天咨询的功能。
本来考虑直接使用Websocket,但是调研后发现基于Websocket的STOMP协议似乎更容易实现功能,由此记录一下聊天功能的实现。
由于是毕设简单演示,消息的存储并未放在任何数据库中(其实只是懒),仅使用localstore进行临时存储。由于是单子项目并未上传到任何仓库中。
STOMP协议
STOMP (Simple Text Oriented Messaging Protocol) 是一个简单的面向文本的消息传递协议。它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理进行交互(从不知道哪抄的)。
STOMP理解下来大概就是消息订阅发布,大致流程就是
- 客户端首先根据自己的id订阅了自己的消息(频道)
- 消息发送方消息发送消息到服务器,里面携带了接收方的id
- 服务器接收消息后,根据收到的接收方id找到该频道投送消息
- 接收方发现自己订阅的频道有消息进行接收
STOMP命令类型:
CONNECT
:建立连接
SEND
:发送消息
SUBSCRIBE
:订阅消息
DISCONNECT
:断开连接
项目实现
核心配置 - WebsocketConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration @EnableWebSocketMessageBroker public class WebsocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("*"); }
@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setApplicationDestinationPrefixes("/app") .setUserDestinationPrefix("/user") .enableSimpleBroker("/topic", "/queue"); }
@Override public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(authChannelInterceptor); } }
|
配置说明:
/ws
:WebSocket连接端点
/app
:应用程序目的地前缀,用于路由到@MessageMapping方法
/user
:用户目的地前缀,用于点对点消息
/topic
:用于广播消息
/queue
:用于点对点消息
身份认证拦截器 - AuthChannelInterceptor
该拦截器实现了完整的用户身份验证和在线状态管理:
拦截器需要继承ChannelInterceptor,然后在配置类中进行注册即可使用
连接认证流程:
- Token验证:从请求头中获取Authorization token
- Redis验证:检查token在Redis中是否存在
- JWT解析:解析token获取用户信息
- 设置Principal:为WebSocket会话设置用户身份
- 在线状态管理:将用户添加到Redis集合中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Component public class AuthChannelInterceptor implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) { List<String> authorization = accessor.getNativeHeader("Authorization"); try { String redisToken = operations.get(token); if (redisToken == null) { return null; } Map<String, Object> userInfo = JwtUtil.parseToken(redisToken); Principal principal = () -> userInfo.get("id")+"_" + userInfo.get("username"); accessor.setUser(principal); ThreadLocalUtil.set(userInfo);
String role = String.valueOf(userInfo.get("role")); String userId = String.valueOf(userInfo.get("id")); Object username = userInfo.get("username"); template.opsForSet().add("身份", userId+"_" + username); return message; } catch (Exception e) { e.printStackTrace(); return null; } } return message; }
}
|
断开连接处理:
1 2 3 4 5 6 7 8 9
| public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { if (accessor != null && StompCommand.DISCONNECT.equals(accessor.getCommand())) { ThreadLocalUtil.remove();
template.opsForSet().remove("student", userId+"_" + username); } }
|
消息收发机制
消息数据结构
1 2 3 4 5 6 7 8
| @Data public class ChatMessage { private Integer from; private String fromName; private String to; private String content; private LocalDateTime sendTime; }
|
消息发送 - ChatMessageHandler
核心发送方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @MessageMapping("/chat.send") public void handleChatMessage(Principal principal, @Payload ChatMessage message) { String fromInfo = principal.getName(); String[] split = fromInfo.split("_"); message.setFrom(Integer.parseInt(split[0])); message.setFromName(split[1]);
message.setSendTime(LocalDateTime.now());
messagingTemplate.convertAndSendToUser( message.getTo(), "/queue/messages", message ); }
|
消息接收
客户端需要订阅以下地址来接收消息:
- 个人消息队列:
/user/queue/messages
- 广播消息:
/topic/broadcast
前端首先需要导入
pnpm install @stomp/stompjs ws
先创建一个配置客户端配置的函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
客户端创建函数 export function createStompClient(options = {}) { const token = localStorage.getItem('token') || '' const client = new Client({ brokerURL: 'ws://localhost:8080/ws', connectHeaders: { Authorization: token }, reconnectDelay: 5000, heartbeatIncoming: 4000, heartbeatOutgoing: 4000, onConnect: options.onConnect || (() => console.log('STOMP连接成功')), onDisconnect: options.onDisconnect || (() => console.log('STOMP连接已断开')), onWebSocketError: options.onError || ((error) => console.error('WebSocket 连接错误:', error)), onStompError: options.onStompError || ((frame) => console.error('STOMP 错误:', frame)), debug: (msg) => { if (process.env.NODE_ENV !== 'production') { console.debug(msg) } } }) return client }
|
然后找个地方放STOMP客户端,我直接放在store里面配合其他函数方便使用消息收发了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| const connectStomp = () => { if (stompClient) { disconnectStomp(); } try { stompClient = createStompClient({ }); stompClient.activate(); } catch (error) { console.error("创建STOMP连接失败:", error); } };
const disconnectStomp = () => { if (stompClient && isConnected.value) { stompClient.deactivate(); stompClient = null; isConnected = false; } };
|
消息收发函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
const subscribeToMessages = () => { if (stompClient && isConnected.value) { stompClient.subscribe("/user/queue/messages", (message) => { const msg = JSON.parse(message.body); console.log("收到消息:", msg); handleReceiveMessage(msg); }); } };
const sendMessage = (message) => { if(isConnected.value){ stompClient.publish({ destination: "/app/chat.send", body: JSON.stringify(message), headers: { "content-type": "application/json", }, }); } }
|
在线用户通知
由于是配合Redis,所以如果需要获取在线咨询师列表的话,我们可以直接从Redis客户端即取。
比如在载入页面的时候调用后端接口,直接展示出咨询师的列表。由于使用的是消息订阅发布模型。
对于后续有新咨询师上线,可以在上线后直接通过广播 /topic
直接进行发布消息,客户端也只需要进行订阅然后进行更新本地聊天列表即可。