基于Spring Websocket STOMP协议在线聊天核心功能的实现
Wucheng

背景

接了高中同学朋友的一个毕设单子,帮他修复问题(他Vue Router的路由守卫没有检查,一直无限重定向了),然后顺便再开发一个心理咨询项目的在线聊天咨询的功能。

本来考虑直接使用Websocket,但是调研后发现基于Websocket的STOMP协议似乎更容易实现功能,由此记录一下聊天功能的实现。

由于是毕设简单演示,消息的存储并未放在任何数据库中(其实只是懒),仅使用localstore进行临时存储。由于是单子项目并未上传到任何仓库中。

STOMP协议

STOMP (Simple Text Oriented Messaging Protocol) 是一个简单的面向文本的消息传递协议。它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理进行交互(从不知道哪抄的)。

STOMP理解下来大概就是消息订阅发布,大致流程就是

  1. 客户端首先根据自己的id订阅了自己的消息(频道)
  2. 消息发送方消息发送消息到服务器,里面携带了接收方的id
  3. 服务器接收消息后,根据收到的接收方id找到该频道投送消息
  4. 接收方发现自己订阅的频道有消息进行接收

STOMP命令类型:

  • CONNECT:建立连接
  • SEND:发送消息
  • SUBSCRIBE:订阅消息
  • DISCONNECT:断开连接

项目实现

核心配置 - WebsocketConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
// 注册STOMP端点
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*");
}

// 配置消息代理
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app")
.setUserDestinationPrefix("/user")
.enableSimpleBroker("/topic", "/queue");
}

// 添加拦截器(下文中)
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {

registration.interceptors(authChannelInterceptor);
}
}

配置说明:

  • /ws:WebSocket连接端点
  • /app:应用程序目的地前缀,用于路由到@MessageMapping方法
  • /user:用户目的地前缀,用于点对点消息
  • /topic:用于广播消息
  • /queue:用于点对点消息

身份认证拦截器 - AuthChannelInterceptor

该拦截器实现了完整的用户身份验证和在线状态管理:

拦截器需要继承ChannelInterceptor,然后在配置类中进行注册即可使用

连接认证流程:

  1. Token验证:从请求头中获取Authorization token
  2. Redis验证:检查token在Redis中是否存在
  3. JWT解析:解析token获取用户信息
  4. 设置Principal:为WebSocket会话设置用户身份
  5. 在线状态管理:将用户添加到Redis集合中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class AuthChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
// 对于非CONNECT命令,放行请求
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
// 获取token
List<String> authorization = accessor.getNativeHeader("Authorization");
try {
// 验证Redis中是否存在token
String redisToken = operations.get(token);
if (redisToken == null) {
return null; // token无效
}
// 解析token获取用户信息
Map<String, Object> userInfo = JwtUtil.parseToken(redisToken);
// 设置用户Principal
Principal principal = () -> userInfo.get("id")+"_" + userInfo.get("username");
accessor.setUser(principal);
// 存储用户信息到ThreadLocal,便于后续处理
ThreadLocalUtil.set(userInfo);

// 将用户添加到 Redis 集合
String role = String.valueOf(userInfo.get("role"));
String userId = String.valueOf(userInfo.get("id"));
Object username = userInfo.get("username");
// 将信息放入redis,以便后续获取在线列表
template.opsForSet().add("身份", userId+"_" + username);
return message;
} catch (Exception e) {
e.printStackTrace();
return null; // 认证失败,返回null
}
}
return message;
}

// 下面还有断开连接的逻辑
}

断开连接处理:

1
2
3
4
5
6
7
8
9
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
if (accessor != null && StompCommand.DISCONNECT.equals(accessor.getCommand())) {
// 清除ThreadLocal
ThreadLocalUtil.remove();

// 从Redis集合中移除用户状态
template.opsForSet().remove("student", userId+"_" + username);
}
}

消息收发机制

消息数据结构

1
2
3
4
5
6
7
8
@Data
public class ChatMessage {
private Integer from; // 发送者ID
private String fromName; // 发送者用户名
private String to; // 接收者ID
private String content; // 消息内容
private LocalDateTime sendTime; // 发送时间
}

消息发送 - ChatMessageHandler

核心发送方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@MessageMapping("/chat.send")
public void handleChatMessage(Principal principal, @Payload ChatMessage message) {
// 1. 从Principal获取发送者信息
String fromInfo = principal.getName(); // 格式:userId_username
String[] split = fromInfo.split("_");
message.setFrom(Integer.parseInt(split[0]));
message.setFromName(split[1]);

// 2. 设置发送时间
message.setSendTime(LocalDateTime.now());

// 3. 发送点对点消息
messagingTemplate.convertAndSendToUser(
message.getTo(), // 接收者ID
"/queue/messages", // 目的地
message // 消息内容
);
}

消息接收

客户端需要订阅以下地址来接收消息:

  • 个人消息队列/user/queue/messages
  • 广播消息/topic/broadcast

前端首先需要导入

pnpm install @stomp/stompjs ws

先创建一个配置客户端配置的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36


客户端创建函数
export function createStompClient(options = {}) {
// 获取token
const token = localStorage.getItem('token') || ''
// 创建STOMP客户端
const client = new Client({
// STOMP服务端点
brokerURL: 'ws://localhost:8080/ws',
// 添加token到连接头信息
connectHeaders: {
Authorization: token
},
// 自动重连设置
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
// STOMP连接回调
onConnect: options.onConnect || (() => console.log('STOMP连接成功')),
// 断开连接回调
onDisconnect: options.onDisconnect || (() => console.log('STOMP连接已断开')),
// 连接错误回调
onWebSocketError: options.onError || ((error) => console.error('WebSocket 连接错误:', error)),
// STOMP错误处理
onStompError: options.onStompError || ((frame) => console.error('STOMP 错误:', frame)),
// 调试日志设置
debug: (msg) => {
if (process.env.NODE_ENV !== 'production') {
console.debug(msg)
}
}
})

return client
}

然后找个地方放STOMP客户端,我直接放在store里面配合其他函数方便使用消息收发了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 建立STOMP连接
const connectStomp = () => {
// 如果已经有连接,先断开
if (stompClient) {
disconnectStomp();
}
try {
// 创建STOMP客户端
stompClient = createStompClient({
// 这里是以上函数的回调函数
});
// 激活连接
stompClient.activate();
} catch (error) {
console.error("创建STOMP连接失败:", error);
}
};
// 断开STOMP连接
const disconnectStomp = () => {
if (stompClient && isConnected.value) {
stompClient.deactivate();
stompClient = null;
isConnected = false;
}
};

消息收发函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

// 订阅自己的消息
const subscribeToMessages = () => {
if (stompClient && isConnected.value) {
stompClient.subscribe("/user/queue/messages", (message) => {
const msg = JSON.parse(message.body);
console.log("收到消息:", msg);
handleReceiveMessage(msg);
});
}
};

const sendMessage = (message) => {
if(isConnected.value){
// 发送消息到服务器
stompClient.publish({
destination: "/app/chat.send",
body: JSON.stringify(message),
headers: {
"content-type": "application/json",
},
});
}
}

在线用户通知

由于是配合Redis,所以如果需要获取在线咨询师列表的话,我们可以直接从Redis客户端即取。

比如在载入页面的时候调用后端接口,直接展示出咨询师的列表。由于使用的是消息订阅发布模型。

对于后续有新咨询师上线,可以在上线后直接通过广播 /topic直接进行发布消息,客户端也只需要进行订阅然后进行更新本地聊天列表即可。