SpringBoot整合WebSocket实践

SpringBoot整合WebSocket实践,第1张

简介

先来看下维基百科WebSocket的简介:

WebSocket是一种与HTTP不同的协议。两者都位于OSI模型的应用层,并且都依赖于传输层的TCP协议。 虽然它们不同,但是RFC 6455中规定:it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries(WebSocket通过HTTP端口80和443进行工作,并支持HTTP代理和中介),从而使其与HTTP协议兼容。 为了实现兼容性,WebSocket握手使用HTTP Upgrade头[1]从HTTP协议更改为WebSocket协议。

WebSocket协议支持Web浏览器(或其他客户端应用程序)与Web服务器之间的交互,具有较低的开销,便于实现客户端与服务器的实时数据传输。 服务器可以通过标准化的方式来实现,而无需客户端首先请求内容,并允许消息在保持连接打开的同时来回传递。通过这种方式,可以在客户端和服务器之间进行双向持续对话。 通信通过TCP端口80或443完成,这在防火墙阻止非Web网络连接的环境下是有益的。另外,Comet之类的技术以非标准化的方式实现了类似的双向通信。

大多数浏览器都支持该协议,包括Google Chrome、Firefox、Safari、Microsoft Edge、Internet Explorer和Opera。

WebSocket协议规范将ws(WebSocket)和wss(WebSocket Secure)定义为两个新的统一资源标识符(URI)方案,分别对应明文和加密连接。除了方案名称和片段ID(不支持#)之外,其余的URI组件都被定义为此URI的通用语法。

使用浏览器开发人员工具,开发人员可以检查WebSocket握手以及WebSocket框架。

WebSocket用于前端(Web浏览器)和后端(Web服务器)保持长时间连接及数据实时传输。

客户端首先请求内容,并允许消息在保持连接打开的同时来回传递,通过这种方式,可以在客户端和服务器之间进行双向持续对话。

扩展:维持http长连接的几种方式有以下几种,具体可网上搜索相关实现,这里只介绍WebSocket保持长连接。

  • ajax 轮询
  • long poll 长轮询
  • iframe 长连接
  • WebSocket.

以下开始SpringBoot整合WebSocket

引入依赖
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-webartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-websocketartifactId>
    dependency>
    <dependency>
        <groupId>org.projectlombokgroupId>
        <artifactId>lombokartifactId>
        <optional>trueoptional>
    dependency>
    <dependency>
        <groupId>com.alibabagroupId>
        <artifactId>fastjsonartifactId>
        <version>1.2.75version>
    dependency>
    <dependency>
        <groupId>cn.hutoolgroupId>
        <artifactId>hutool-allartifactId>
        <version>5.7.9version>
    dependency>
开启WebSocket支持
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 开启WebSocket支持
 *
 * @author wave-muly
 * @date 2021/6/21 下午5:01
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
会话 *** 作接口封装

如果需要客户端服务端消息传输,WebSocket提供了javax.websocket.Session类,这里只用到其中几个方法

  • socketChannel.getBasicRemote().sendText(msg);
  • socketChannel.getAsyncRemote().sendText(msg);
  • socketChannel.isOpen();
  • socketChannel.close();

这里定义四个方法,主要是对javax.websocket.Session类 *** 作的封装

以下是该接口的包装实现类

/**
 * socket会话 *** 作接口
 * 

* 该接口面向会话,须基于会话的通道调用。 * 该接口支持扩展,可参考WebSocket模块中{@link com.muly.wave.socket.websocket.operator.channel}包下的类 * * @author wave-muly * @date 2021/6/1 上午11:46 */ public interface SocketSessionOperatorApi { /** * 写出数据,经过责任链 * * @author wave-muly * @date 2021/6/1 上午11:48 **/ void writeAndFlush(Object obj); /** * 写出数据,不经过责任链 * * @author wave-muly * @date 2021/6/1 上午11:48 **/ void writeToChannel(Object obj); /** * 关闭会话 * * @author wave-muly * @date 2021/6/1 上午11:48 **/ void close(); /** * 是否存活 * * @return {@link boolean} * @author wave-muly * @date 2021/6/1 上午11:50 **/ boolean isInvalid(); }

import com.muly.wave.socket.api.session.SocketSessionOperatorApi;

/**
 * 对Api模块的 *** 作类进行扩展
 * 

* 暂时只写接口,SocketOperatorApi方法不够用时再对此类进行扩展 * * @author wave-muly * @date 2021/6/1 下午3:44 */ public interface SocketChannelExpandInterFace extends SocketSessionOperatorApi { }

import com.alibaba.fastjson.JSON;

import javax.websocket.Session;
import java.io.IOException;

/**
 * Socket *** 作类实现
 * 

* 简单封装Spring Boot的默认WebSocket * * @author wave-muly * @date 2021/6/1 下午3:41 */ public class WaveSocketOperator implements SocketChannelExpandInterFace { /** * 实际 *** 作的通道 */ private Session socketChannel; public WaveSocketOperator(Session socketChannel) { this.socketChannel = socketChannel; } @Override public void writeAndFlush(Object obj) { try { if (socketChannel.isOpen()) { socketChannel.getBasicRemote().sendText(JSON.toJSONString(obj)); } } catch (IOException e) { e.printStackTrace(); } } @Override public void writeToChannel(Object obj) { if (socketChannel.isOpen()) { socketChannel.getAsyncRemote().sendText(JSON.toJSONString(obj)); } } @Override public void close() { try { if (socketChannel.isOpen()) { socketChannel.close(); } } catch (IOException e) { e.printStackTrace(); } } @Override public boolean isInvalid() { return socketChannel.isOpen(); } }

SocketSession

该类持有会话 *** 作接口SocketSessionOperatorApi

import com.muly.wave.socket.api.session.SocketSessionOperatorApi;
import lombok.Data;

/**
 * Socket会话
 *
 * @author wave-muly
 * @date 2021/6/1 上午11:28
 */
@Data
public class SocketSession<T extends SocketSessionOperatorApi> {

    /**
     * 会话ID,每一个新建的会话都有(目前使用通道ID)
     */
    private String sessionId;

    /**
     * 会话唯一标识
     */
    private String userId;

    /**
     * 该会话监听的消息类型
     */
    private String messageType;

    /**
     * token信息
     */
    private String token;

    /**
     * 连接时间
     */
    private Long connectionTime;

    /**
     * 最后活跃时间
     */
    private Long lastActiveTime;

    /**
     *  *** 作API
     */
    private T socketOperatorApi;

    /**
     * 自定义数据
     */
    private Object data;

}

接下来再定义个存储SocketSession的会话中心类

SessionCenter会话中心
import cn.hutool.core.util.ObjectUtil;
import com.muly.wave.socket.api.session.pojo.SocketSession;
import com.muly.wave.socket.websocket.operator.channel.WaveSocketOperator;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 会话中心
 * 

* 维护所有的会话 * * @author wave-muly * @date 2021/6/1 下午1:43 */ public class SessionCenter { /** * 所有用户会话维护 */ private static ConcurrentMap<String, List<SocketSession<WaveSocketOperator>>> socketSessionMap = new ConcurrentHashMap<>(); /** * 获取维护的所有会话 * * @return {@link ConcurrentMap< String, SocketSession< WaveSocketOperator >>} * @author wave-muly * @date 2021/6/1 下午2:13 **/ public static ConcurrentMap<String, List<SocketSession<WaveSocketOperator>>> getSocketSessionMap() { return socketSessionMap; } /** * 根据用户ID获取会话信息列表 * * @param userId 用户ID * @return {@link SocketSession } * @author wave-muly * @date 2021/6/1 下午1:48 **/ public static List<SocketSession<WaveSocketOperator>> getSessionByUserId(String userId) { return socketSessionMap.get(userId); } /** * 根据用户ID和消息类型获取会话信息列表 * * @param userId 用户ID * @return {@link SocketSession } * @author wave-muly * @date 2021/6/1 下午1:48 **/ public static List<SocketSession<WaveSocketOperator>> getSessionByUserIdAndMsgType(String userId) { return socketSessionMap.get(userId); } /** * 根据会话ID获取会话信息 * * @param sessionId 会话ID * @return {@link SocketSession } * @author wave-muly * @date 2021/6/1 下午1:48 **/ public static SocketSession<WaveSocketOperator> getSessionBySessionId(String sessionId) { for (List<SocketSession<WaveSocketOperator>> values : socketSessionMap.values()) { for (SocketSession<WaveSocketOperator> session : values) { if (sessionId.equals(session.getSessionId())) { return session; } } } return null; } /** * 设置会话 * * @param socketSession 会话详情 * @author wave-muly * @date 2021/6/1 下午1:49 **/ public static void addSocketSession(SocketSession<WaveSocketOperator> socketSession) { List<SocketSession<WaveSocketOperator>> socketSessions = socketSessionMap.get(socketSession.getUserId()); if (ObjectUtil.isEmpty(socketSessions)) { socketSessions = Collections.synchronizedList(new ArrayList<>()); socketSessionMap.put(socketSession.getUserId(), socketSessions); } socketSessions.add(socketSession); } /** * 连接关闭 * * @param sessionId 会话ID * @author wave-muly * @date 2021/6/1 下午3:25 **/ public static void closed(String sessionId) { Set<Map.Entry<String, List<SocketSession<WaveSocketOperator>>>> entrySet = socketSessionMap.entrySet(); Iterator<Map.Entry<String, List<SocketSession<WaveSocketOperator>>>> iterator = entrySet.iterator(); while (iterator.hasNext()) { Map.Entry<String, List<SocketSession<WaveSocketOperator>>> next = iterator.next(); List<SocketSession<WaveSocketOperator>> value = next.getValue(); if (ObjectUtil.isNotEmpty(value)) { value.removeIf(GunsSocketOperatorSocketSession -> GunsSocketOperatorSocketSession.getSessionId().equals(sessionId)); } } } }

这个类的主要作用是所有用户会话的维护,存储在支持并发访问的ConcurrentHashMap里,定义的方法主要是用来 *** 作map来实现会话的加入、关闭及查询。

Socket通用 *** 作类

有了 *** 作javax.websocket.Session类及会话中心,现在封装一个服务端通过会话中心拿到Session给客户端发送消息的接口.

定义一个SocketOperatorApi接口:

import com.muly.wave.socket.api.exception.SocketException;
import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;

/**
 * Socket通用 *** 作类
 * 

* 可通过该类直接发送消息,每一个Socket实现的子模块必须实现该接口,以提供统一的 *** 作API * * @author wave-muly * @date 2021/6/2 上午9:25 */ public interface SocketOperatorApi { /** * 发送消息到指定会话 * * @param msgType 消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类 * @param sessionId 会话ID * @param msg 消息体 * @author wave-muly * @date 2021/6/11 下午2:19 **/ void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) throws SocketException; /** * 发送消息到指定用户的所有会话 *

* 如果用户同一个消息类型建立了多个会话,则统一全部发送 * * @param msgType 消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类 * @param userId 用户ID * @param msg 消息体 * @author wave-muly * @date 2021/6/2 上午9:35 **/ void sendMsgOfUserSession(String msgType, String userId, Object msg) throws SocketException; /** * 发送消息到所有会话 * * @param msgType 消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类 * @param msg 消息体 * @author wave-muly * @date 2021/6/2 上午9:35 **/ void sendMsgOfAllUserSession(String msgType, Object msg); /** * 根据会话id关闭会话 * * @param socketId 会话id * @author wave-muly * @date 2021/8/13 16:00 **/ void closeSocketBySocketId(String socketId); /** * 监听指定类型消息 *

* 1.该方法每调用一次即注册一个监听,同一个消息类型多次调用只有最后一次生效 * * @param msgType 消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类 * @param callbackInterface 消息监听器 * @author wave-muly * @date 2021/6/2 上午9:54 **/ void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface); }

以下为该接口的实现类,该实现类主要用于服务端向客户端发送消息

import cn.hutool.core.util.ObjectUtil;
import com.muly.wave.socket.api.SocketOperatorApi;
import com.muly.wave.socket.api.exception.SocketException;
import com.muly.wave.socket.api.exception.enums.SocketExceptionEnum;
import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;
import com.muly.wave.socket.api.session.pojo.SocketSession;
import com.muly.wave.socket.websocket.message.SocketMessageCenter;
import com.muly.wave.socket.websocket.operator.channel.WaveSocketOperator;
import com.muly.wave.socket.websocket.pojo.WebSocketMessageDTO;
import com.muly.wave.socket.websocket.session.SessionCenter;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.List;

/**
 * WebSocket *** 作实现类
 * 

* 如果是Spring boot项目,通过注入SocketOperatorApi接口 *** 作socket,需将本来交给Spring管理 * * @author wave-muly * @date 2021/6/2 上午10:41 */ @Component public class WebSocketOperator implements SocketOperatorApi { @Override public void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) throws SocketException { SocketSession<WaveSocketOperator> session = SessionCenter.getSessionBySessionId(sessionId); if (ObjectUtil.isEmpty(session)) { throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST); } WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO(); webSocketMessageDTO.setData(msg); webSocketMessageDTO.setServerMsgType(msgType); session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO); } @Override public void sendMsgOfUserSession(String msgType, String userId, Object msg) throws SocketException { // 根据用户ID获取会话 List<SocketSession<WaveSocketOperator>> socketSessionList = SessionCenter.getSessionByUserIdAndMsgType(userId); if (ObjectUtil.isEmpty(socketSessionList)) { throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST); } WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO(); webSocketMessageDTO.setData(msg); webSocketMessageDTO.setServerMsgType(msgType); for (SocketSession<WaveSocketOperator> session : socketSessionList) { // 发送内容 session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO); } } @Override public void sendMsgOfAllUserSession(String msgType, Object msg) { Collection<List<SocketSession<WaveSocketOperator>>> values = SessionCenter.getSocketSessionMap().values(); WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO(); webSocketMessageDTO.setData(msg); webSocketMessageDTO.setServerMsgType(msgType); for (List<SocketSession<WaveSocketOperator>> sessions : values) { for (SocketSession<WaveSocketOperator> session : sessions) { // 找到该类型的通道 if (session.getMessageType().equals(msgType)) { session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO); } } } } @Override public void closeSocketBySocketId(String socketId) { SessionCenter.closed(socketId); } @Override public void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface) { SocketMessageCenter.setMessageListener(msgType, callbackInterface); } }

该接口涉及到一个WebSocket交互通用对象

import com.muly.wave.socket.api.SocketOperatorApi;
import lombok.Data;

/**
 * WebSocket交互通用对象
 *
 * @author wave-muly
 * @date 2021/6/1 下午2:56
 */
@Data
public class WebSocketMessageDTO {

    /**
     * 服务端发送的消息类型(客户端如果需要监听该消息类型,注册对应的消息处理器即可)
     */
    private String serverMsgType;

    /**
     * 客户端发送的消息类型(服务端需要处理的消息类型)
     */
    private String clientMsgType;

    /**
     * 目标Id
     */
    private String toUserId;

    /**
     * 发送者ID
     */
    private String formUserId;

    /**
     * 具体发送的数据
     */
    private Object data;

}

鉴于以上多次提到了,msgType这个属性,这个类型定义了消息类型,用于区分服务端与客户端之间消息通讯的类型

以下定义了三种类型的枚举方便管理,

  • SystemMessageTypeEnum

    该枚举适用于服务端监听首次连接和断开连接,以S00开头

    import lombok.Getter;
    
    /**
     * 服务端监听器枚举
     * 

    * 说明:该枚举适用于服务端监听首次连接和断开连接 * * @author wave-muly * @date 2021/6/3 上午9:14 */ @Getter public enum SystemMessageTypeEnum { /** * 监听首次连接 */ SYS_LISTENER_ONOPEN("S00001", "监听首次连接"), /** * 监听断开连接 */ SYS_LISTENER_ONCLOSE("S00002", "监听断开连接"), /** * 监听异常信息 */ SYS_LISTENER_ONERROR("S00003", "监听异常信息"); private final String code; private final String name; SystemMessageTypeEnum(String code, String name) { this.code = code; this.name = name; } }

  • ServerMessageTypeEnum

    该枚举适用于服务器推送给客户端消息时使用,以100开头

    import lombok.Getter;
    
    /**
     * 服务端消息类型枚举
     * 

    * 说明:该枚举适用于服务器推送给客户端消息时使用 * * @author wave-muly * @date 2021/6/3 上午9:14 */ @Getter public enum ServerMessageTypeEnum { /** * 系统通知消息类型 */ SYS_NOTICE_MSG_TYPE("100001", "系统通知消息类型"), /** * 连接消息回复 */ SYS_REPLY_MSG_TYPE("100002", "连接消息回复"); private final String code; private final String name; ServerMessageTypeEnum(String code, String name) { this.code = code; this.name = name; } }

  • ClientMessageTypeEnum

    该枚举适用于服务器接收到客户端发来的消息,判断消息类型时使用,以200开头

    import lombok.Getter;
    
    /**
     * 客户端消息类型枚举
     * 

    * 说明:该枚举适用于服务器接收到客户端发来的消息,判断消息类型时使用 * * @author wave-muly * @date 2021/6/3 上午9:14 */ @Getter public enum ClientMessageTypeEnum { /** * 用户连接鉴权 */ USER_CONNECTION_AUTHENTICATION("200000", "用户连接鉴权"), /** * 用户心跳消息类型 */ USER_HEART("299999", "用户心跳消息类型"); private final String code; private final String name; ClientMessageTypeEnum(String code, String name) { this.code = code; this.name = name; } }

    特殊说明一下serverMsgTypeclientMsgType的区别
    1.serverMsgType字段是服务端发送给客户端的字段
    2.clientMsgType字段是客户端发送给服务器的字段
    例如:客户端发送给服务器一个心跳消息(type:299999),服务端如果需要处理该消息就注册一个该消息的监听器,那么收到消息服务端会把消息推送给对应的监听器,接口见{@link SocketOperatorApi#msgTypeCallback}

上述四个方法中,还涉及到一个方法void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface);

其中SocketMsgCallbackInterface为Socket消息接收回调接口,根据枚举类型注入相应的消息处理方法,

import com.muly.wave.socket.api.session.pojo.SocketSession;

/**
 * Socket消息接收回调接口
 *
 * @author wave-muly
 * @date 2021/6/2 上午9:53
 */
@FunctionalInterface
public interface SocketMsgCallbackInterface {

    /**
     * 收到消息的回调
     *
     * @param msgType       消息类型
     * @param msg           消息体
     * @param socketSession 本次通信的会话
     * @author wave-muly
     * @date 2021/6/2 上午9:51
     **/
    void callback(String msgType, Object msg, SocketSession socketSession);
}

该接口是一个函数式接口,可以使用java8的lambda表达式注入回调方法。

再次构造个消息回调方法中心,用于存储各种消息类型的消息回调方法处理。messageListenerMap的key即为msgType

import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;
import java.util.HashMap;
import java.util.Map;

/**
 * 会话消息中心
 * 

* 维护所有消息类型对应的处理器 * * @author wave-muly * @date 2021/6/1 下午2:20 */ public class SocketMessageCenter { /** * 所有消息监听器维护 */ private static Map<String, SocketMsgCallbackInterface> messageListenerMap = new HashMap<>(); /** * 设置消息类型的监听器 * * @param msgType 消息类型 * @param listener 监听器 * @author wave-muly * @date 2021/6/1 下午2:25 **/ public static void setMessageListener(String msgType, SocketMsgCallbackInterface listener) { messageListenerMap.put(msgType, listener); } /** * 获取消息监听器 * * @param msgType 消息类型 * @return {@link SocketMsgCallbackInterface} * @author wave-muly * @date 2021/6/1 下午2:26 **/ public static SocketMsgCallbackInterface getSocketMsgCallbackInterface(String msgType) { return messageListenerMap.get(msgType); } }

消息监听处理器WebSocketServer

该类为WebSocket的入口类,用于处理和客户端的连接关闭及接收客户端发送的消息

import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.muly.wave.socket.api.enums.ClientMessageTypeEnum;
import com.muly.wave.socket.api.enums.ServerMessageTypeEnum;
import com.muly.wave.socket.api.enums.SystemMessageTypeEnum;
import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;
import com.muly.wave.socket.api.session.pojo.SocketSession;
import com.muly.wave.socket.websocket.message.SocketMessageCenter;
import com.muly.wave.socket.websocket.operator.channel.WaveSocketOperator;
import com.muly.wave.socket.websocket.pojo.WebSocketMessageDTO;
import com.muly.wave.socket.websocket.session.SessionCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

/**
 * 消息监听处理器
 *
 * @author wave-muly
 * @date 2021/6/1 下午2:35
 */
@Slf4j
@ServerEndpoint(value = "/webSocket/{token}")
@Component
public class WebSocketServer {

    /**
     * 连接建立调用的方法
     * 

* 暂时无用,需要在建立连接的时候做一些事情的话可以修改这里 * * @param session 会话信息 * @author wave-muly * @date 2021/6/21 下午5:14 **/ @OnOpen public void onOpen(Session session, @PathParam("token") String token) { String userId = token; // todo 这里可以加上token校验处理获取用户信息 // try { // // 解析用户信息 // DefaultJwtPayload defaultPayload = JwtContext.me().getDefaultPayload(token); // userId = defaultPayload.getUserId().toString(); // } catch (io.jsonwebtoken.JwtException e) { // try { // session.close(); // } catch (IOException ioException) { // ioException.printStackTrace(); // } // } // *** 作api包装 WaveSocketOperator gunsSocketOperator = new WaveSocketOperator(session); // 回复消息 WebSocketMessageDTO replyMsg = new WebSocketMessageDTO(); replyMsg.setServerMsgType(ServerMessageTypeEnum.SYS_REPLY_MSG_TYPE.getCode()); replyMsg.setToUserId(userId); // 创建会话对象 SocketSession<WaveSocketOperator> socketSession = new SocketSession<>(); try { // 设置回复内容 replyMsg.setData(session.getId()); socketSession.setSessionId(session.getId()); socketSession.setUserId(userId); socketSession.setSocketOperatorApi(gunsSocketOperator); socketSession.setToken(token); socketSession.setConnectionTime(System.currentTimeMillis()); // 维护会话 SessionCenter.addSocketSession(socketSession); } finally { // 回复消息 gunsSocketOperator.writeAndFlush(replyMsg); // 触发首次连接回调 SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode()); if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) { // 触发回调 socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode(), null, socketSession); } } } /** * 连接关闭调用的方法 * * @param session 会话信息 * @author wave-muly * @date 2021/6/21 下午5:14 **/ @OnClose public void onClose(Session session) { try { SocketSession<WaveSocketOperator> socketSession = SessionCenter.getSessionBySessionId(session.getId()); // 触发首次连接回调 SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONCLOSE.getCode()); if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) { // 触发回调 socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONCLOSE.getCode(), null, socketSession); } } finally { SessionCenter.closed(session.getId()); } } /** * 收到消息调用的方法 * * @param message  接收到的消息 * @param socketChannel 会话信息 * @author wave-muly * @date 2021/6/21 下午5:14 **/ @OnMessage public void onMessage(String message, Session socketChannel) { // 转换为Java对象 WebSocketMessageDTO webSocketMessageDTO = JSON.parseObject(message, WebSocketMessageDTO.class); // 维护通道是否已初始化 SocketSession<WaveSocketOperator> socketSession = SessionCenter.getSessionBySessionId(socketChannel.getId()); // 心跳包 if (ObjectUtil.isNotEmpty(socketSession) && ClientMessageTypeEnum.USER_HEART.getCode().equals(webSocketMessageDTO.getClientMsgType())) { // 更新会话最后活跃时间 if (ObjectUtil.isNotEmpty(socketSession)) { socketSession.setLastActiveTime(System.currentTimeMillis()); } return; } // 用户ID为空不处理直接跳过 if (ObjectUtil.isEmpty(webSocketMessageDTO.getFormUserId())) { return; } // 会话建立成功执行业务逻辑 if (ObjectUtil.isNotEmpty(socketSession)) { // 更新最后会话时间 socketSession.setLastActiveTime(System.currentTimeMillis()); // 找到该消息的处理器 SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessageDTO.getClientMsgType()); if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) { // 触发回调 socketMsgCallbackInterface.callback(webSocketMessageDTO.getClientMsgType(), webSocketMessageDTO, socketSession); } else { socketChannel.getAsyncRemote().sendText("{\"serverMsgType\":\"404\"}"); } } } /** * 会话发送异常调用的方法 * * @param session 会话信息 * @param error  错误信息 * @author wave-muly * @date 2021/6/21 下午5:14 **/ @OnError public void onError(Session session, Throwable error) { SocketSession<WaveSocketOperator> socketSession = SessionCenter.getSessionBySessionId(session.getId()); // 触发首次连接回调 SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONERROR.getCode()); if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) { // 触发回调 socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONERROR.getCode(), error, socketSession); } log.error("session 发生错误:" + session.getId()); } }

上述类中存在处理逻辑,定义了首次连接回调的方法,这里会去找SocketMessageCenter方法 *** 作中心里的SystemMessageTypeEnum.SYS_LISTENER_ONOPEN方法

			// 触发首次连接回调
            SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode());
            if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
                // 触发回调
                socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode(), null, socketSession);
            }

以下为示例,项目启动时注入一个首次连接回调处理方法:在msgType为SystemMessageTypeEnum.SYS_LISTENER_ONOPEN(S00001)时会调用该接口

/**
 * @Author: wave-muly
 * @Date: 2021/10/9 14:09
 */
@Slf4j
@Component
@Order(99)
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MessageListener implements CommandLineRunner {
    private final SocketOperatorApi socketOperatorApi;
    @Override
    public void run(String... args) throws Exception {
        // 项目启动默认注册了 监听首次连接的监听器 在客户端连接时会调用该监听器
        socketOperatorApi.msgTypeCallback(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode(), (msgType, msg, socketSession)->{
            log.info("connection success");
            socketSession.getSocketOperatorApi().writeAndFlush("connection success");
        });
    }
}

再定义一个服务端向客户端发送消息的示例,这里定义一个Controller

import com.muly.wave.socket.api.SocketOperatorApi;
import com.muly.wave.socket.api.enums.ServerMessageTypeEnum;
import com.muly.wave.socket.api.exception.SocketException;
import com.muly.wave.socket.websocket.pojo.SysMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import java.util.Date;


/**
 * @Author: wave-muly
 * @Date: 2021/10/9 11:28
 */
@RequestMapping("/api/v1/sys")
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SysMessageController {

    private final SocketOperatorApi socketOperatorApi;

    @PostMapping(value = "/notice/{userId}", produces = MediaType.APPLICATION_JSON_VALUE)
    public void notice(@PathVariable(name = "userId") String userId, @RequestBody String messageContent) {
        SysMessage item = new SysMessage();
        item.setReceiveUserId(Long.valueOf(userId));
        item.setMessageContent(messageContent);
        item.setMessageType(ServerMessageTypeEnum.SYS_NOTICE_MSG_TYPE.getCode());
        item.setMessageSendTime(new Date());
        try {
            socketOperatorApi.sendMsgOfUserSession(ServerMessageTypeEnum.SYS_NOTICE_MSG_TYPE.getCode(), item.getReceiveUserId().toString(), item);
        } catch (SocketException socketException) {
            // 该用户不在线
        }
    }

}

以下为浏览器测试 [WebSocket在线测试](WebSocket在线测试_在线模拟websocket请求工具 (jsonin.com))

至此SpringBoot整合WebSocket完成,后续项目集成,只需要定义好msgType消息类型、处理token校验逻辑即可。
代码详见 wave-socket

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/langs/730730.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-04-27
下一篇2022-04-27

发表评论

登录后才能评论

评论列表(0条)

    保存