前端websocket(基于 VueUse 的 WebSocket 优雅封装:打造高可用的全局连接管理方案)

前端websocket(基于 VueUse 的 WebSocket 优雅封装:打造高可用的全局连接管理方案)
基于 VueUse 的 WebSocket 优雅封装:打造高可用的全局连接管理方案

在现代前端开发中,WebSocket 作为全双工通信协议,被广泛应用于实时消息推送、在线协作、实时数据监控等场景。但原生 WebSocket API 使用繁琐,且在多连接、重连、心跳检测、状态管理等场景下需要大量重复代码。本文将分享基于 Vue3 + VueUse 的useWebSocket钩子,封装一套高可用、可扩展的全局 WebSocket 管理方案,解决上述痛点。

前端websocket(基于 VueUse 的 WebSocket 优雅封装:打造高可用的全局连接管理方案)

一、封装思路与核心特性

设计目标

  1. 单例管理:全局唯一的 WebSocket 管理器,避免重复创建连接
  2. 自动重连:基于 VueUse 内置能力实现可配置的重连策略
  3. 心跳检测:定时发送心跳包,确保连接活性
  4. 多连接支持:支持同时管理多个不同 URL 的 WebSocket 连接
  5. 事件订阅:灵活的消息、状态、心跳事件订阅机制
  6. 连接限制:防止过多连接占用资源
  7. 优雅的 API 封装:提供简洁、语义化的上层 API,降低使用成本

核心技术栈

  • Vue3(组合式 API)
  • VueUse useWebSocket(封装了 WebSocket 的核心能力,如重连、心跳)
  • ES6+(Class、Map、解构赋值等)

二、核心代码实现

1. 底层 WebSocket 管理器(websocket-manager.js)

实现全局连接的核心管理逻辑,包括连接创建、状态监听、消息分发、心跳处理等。

/** * WebSocket 管理器 * 实现全局唯一连接管理,利用 useWebSocket 内置的重连和心跳机制 */import { watch } from "vue";import { useWebSocket } from "@vueuse/core";class WebSocketManager { constructor() {  // 单例模式检查  if (WebSocketManager.instance) {   return WebSocketManager.instance;  }  /**   * 存储WebSocket连接实例   * Map   */  this.connections = new Map();  /**   * 存储状态监听器   * Map>   */  this.statusListeners = new Map();  /**   * 存储消息监听器   * Map>   */  this.messageListeners = new Map();  /**   * 存储心跳消息监听器   * Map>   */  this.heartbeatListeners = new Map();  /**   * 默认配置选项   * 注意:重连和心跳由 useWebSocket 内部处理   */  this.defaultOptions = {   // 自动重连配置(由 useWebSocket 处理)   autoReconnect: {    retries: 5, // 最大重试次数    delay: 2000, // 重连延迟(毫秒)    maxDelay: 10000, // 最大重连延迟    multiplier: 1.5, // 延迟递增倍数    onFailed: url => {     console.error(`WebSocket 连接重试失败: ${url}`);    }   },   // 心跳检测配置(由 useWebSocket 处理)   heartbeat: {    message: "ping", // 心跳消息内容    interval: 10000 // 心跳间隔(毫秒)   },   // 连接超时配置   timeout: 10000, // 连接超时时间(毫秒)   // 错误处理   onError: (ws, event) => {    console.error("WebSocket 连接发生错误:", event);   },   // 连接成功回调   onConnected: ws => {    //console.log("WebSocket 连接已建立:", ws.url);   },   // 连接关闭回调   onDisconnected: (ws, event) => {    //console.log("WebSocket 连接已断开:", ws.url, event);   }  };  /**   * 活跃连接数限制   */  this.maxConnections = 10;  WebSocketManager.instance = this; } /**  * 创建或获取WebSocket连接  * @param {string} url - WebSocket服务器地址  * @param {Object} options - 连接配置选项  * @param {Function} onMessageCallback - 接收消息时的回调函数  * @param {Function} onHeartbeatCallback - 接收心跳消息时的回调函数  * @returns {Object|null} 连接对象或null(当达到最大连接数时)  */ connect(url, options = {}, onMessageCallback = null, onHeartbeatCallback = null) {  // 检查是否已存在连接  if (this.connections.has(url)) {   const existingConn = this.connections.get(url);   // 如果连接已关闭,重新建立连接   if (existingConn.status !== "OPEN") {    this.disconnect(url);   } else {    console.log(`复用现有连接: ${url}`);    return existingConn;   }  }  // 检查连接数限制  if (this.connections.size >= this.maxConnections) {   console.warn(`已达到最大连接数限制 (${this.maxConnections})`);   return null;  }  // 合并配置选项  const mergedOptions = { ...this.defaultOptions, ...options };  try {   // 添加消息处理   const messageHandler = (ws, event) => {    // 检查是否为心跳消息    const isHeartbeatMessage = event.data === "pong" || event.data === "ping";    if (isHeartbeatMessage) {     // 如果是心跳消息,触发心跳监听器     if (onHeartbeatCallback) {      onHeartbeatCallback(event.data);     }     // 触发心跳消息监听器     const heartbeatListeners = this.heartbeatListeners.get(url) || [];     heartbeatListeners.forEach(listener => {      listener(event.data, url);     });     // 心跳消息不触发普通消息处理     return;    }    // 处理普通消息    if (onMessageCallback) {     onMessageCallback(event.data);    }    // 触发普通消息监听器    const listeners = this.messageListeners.get(url) || [];    listeners.forEach(listener => {     listener(event.data, url);    });   };   const { data, status, close, open, send } = useWebSocket(url, {    ...mergedOptions,    onMessage: messageHandler   });   // 创建连接对象,存储普通值而不是响应式引用   const connection = {    url,    data: data.value, // 存储普通值    status: status.value, // 存储普通值而不是响应式引用    close,    open,    send,    options: mergedOptions, // 保存连接选项    createdAt: new Date(), // 连接创建时间    messageCallback: onMessageCallback, // 保存消息回调    heartbeatCallback: onHeartbeatCallback // 保存心跳消息回调   };   // 添加状态监听器来更新普通值   const unwatch = watch(    () => status.value,    newStatus => {     // 直接更新存储在 connections Map 中的连接对象的状态     const storedConn = this.connections.get(url);     if (storedConn) {      storedConn.status = newStatus;      // 安全地触发外部监听器 - 添加检查      if (this.statusListeners.has(url)) {       const listeners = this.statusListeners.get(url) || [];       listeners.forEach(listener => {        listener(         newStatus,         storedConn.status === newStatus ? newStatus : storedConn.status,         url        );       });      }     }    }   );   // 添加取消监听函数到连接对象   connection.unwatchStatus = unwatch;   // 存储连接实例   this.connections.set(url, connection);   return connection;  } catch (error) {   console.error(`创建WebSocket连接失败: ${url}`, error);   return null;  } } /**  * 订阅连接状态变化  * @param {string} url - 连接URL  * @param {Function} callback - 状态变化回调函数  */ subscribeToStatusChange(url, callback) {  if (!this.statusListeners.has(url)) {   this.statusListeners.set(url, []);  }  const listeners = this.statusListeners.get(url);  listeners.push(callback);  // 返回取消订阅函数  return () => {   const index = listeners.indexOf(callback);   if (index > -1) {    listeners.splice(index, 1);   }  }; } /**  * 订阅消息接收  * @param {string} url - 连接URL  * @param {Function} callback - 消息接收回调函数  */ subscribeToMessages(url, callback) {  if (!this.messageListeners.has(url)) {   this.messageListeners.set(url, []);  }  const listeners = this.messageListeners.get(url);  listeners.push(callback);  // 返回取消订阅函数  return () => {   const index = listeners.indexOf(callback);   if (index > -1) {    listeners.splice(index, 1);   }  }; } /**  * 订阅心跳消息接收  * @param {string} url - 连接URL  * @param {Function} callback - 心跳消息接收回调函数  */ subscribeToHeartbeatMessages(url, callback) {  if (!this.heartbeatListeners.has(url)) {   this.heartbeatListeners.set(url, []);  }  const listeners = this.heartbeatListeners.get(url);  listeners.push(callback);  // 返回取消订阅函数  return () => {   const index = listeners.indexOf(callback);   if (index > -1) {    listeners.splice(index, 1);   }  }; } /**  * 发送消息到指定连接  * @param {string} url - 目标连接URL  * @param {any} message - 要发送的消息  * @returns {boolean} 是否发送成功  */ sendMessage(url, message) {  const conn = this.connections.get(url);  if (!conn) {   console.warn(`WebSocket 连接不存在: ${url}`);   return false;  }  // 检查连接状态  if (conn.status === "OPEN") {   try {    conn.send(message);    return true;   } catch (error) {    console.error(`发送消息失败: ${url}`, error);    return false;   }  } else {   console.warn(`WebSocket 连接未打开: ${url}, 当前状态: ${conn.status}`);   return false;  } } /**  * 批量发送消息  * @param {Array<{url: string, message: any}>} messages - 消息数组  * @returns {Array<{url: string, success: boolean}>} 发送结果  */ sendBatchMessages(messages) {  return messages.map(({ url, message }) => ({   url,   success: this.sendMessage(url, message)  })); } /**  * 关闭并移除连接  * @param {string} url - 要关闭的连接URL  */ disconnect(url) {  const conn = this.connections.get(url);  if (conn) {   // 保存状态监听器,因为稍后会删除   const statusListeners = this.statusListeners.get(url)?.slice() || [];   const previousStatus = conn.status; // 保存当前状态   // 关闭连接   if (typeof conn.close === "function") {    conn.close();   }   // 取消状态监听器   if (conn.unwatchStatus) {    conn.unwatchStatus();   }   // 手动触发状态变更通知,告知连接已关闭 - 在删除连接前   statusListeners.forEach(listener => {    listener(     "CLOSED", // 新状态     previousStatus, // 旧状态     url    );   });   // 移除连接记录   this.connections.delete(url);   // 移除状态监听器   this.statusListeners.delete(url);   // 移除消息监听器   this.messageListeners.delete(url);   // 移除心跳监听器   this.heartbeatListeners.delete(url);  } else {   console.warn(`尝试断开不存在的连接: ${url}`);  } } /**  * 批量关闭连接  * @param {Array} urls - 要关闭的连接URL数组  */ disconnectBatch(urls) {  urls.forEach(url => this.disconnect(url)); } /**  * 关闭所有连接  */ disconnectAll() {  for (const [url] of this.connections) {   this.disconnect(url);  }  console.log("所有 WebSocket 连接已断开"); } /**  * 获取指定连接的状态  * @param {string} url - 连接URL  * @returns {string|undefined} 连接状态  */ getStatus(url) {  const conn = this.connections.get(url);  if (conn) {   return conn.status;  } else {   return "CLOSED"; // 默认返回CLOSED状态  } } /**  * 获取所有连接状态  * @returns {Object} 包含所有连接状态的对象  */ getAllStatus() {  const statuses = {};  for (const [url, conn] of this.connections) {   statuses[url] = {    status: conn.status,    url: conn.url,    createdAt: conn.createdAt   };  }  return statuses; } /**  * 获取当前活跃连接数  * @returns {number} 活跃连接数  */ getActiveConnectionCount() {  let count = 0;  for (const [, conn] of this.connections) {   if (conn.status === "OPEN") {    count++;   }  }  return count; } /**  * 获取连接总数  * @returns {number} 总连接数  */ getTotalConnectionCount() {  return this.connections.size; } /**  * 检查是否有活动连接  * @returns {boolean} 是否有活动连接  */ hasActiveConnections() {  for (const [, conn] of this.connections) {   if (conn.status === "OPEN") {    return true;   }  }  return false; } /**  * 重新连接指定的连接  * @param {string} url - 要重新连接的URL  * @param {Function} onMessageCallback - 接收消息时的回调函数  * @param {Function} onHeartbeatCallback - 接收心跳消息时的回调函数  */ reconnect(url, onMessageCallback = null, onHeartbeatCallback = null) {  const conn = this.connections.get(url);  if (conn) {   this.disconnect(url);   // 使用原始配置重新连接,并传递消息回调   this.connect(url, conn.options, onMessageCallback, onHeartbeatCallback);  } } /**  * 获取指定连接的创建时间  * @param {string} url - 连接URL  * @returns {Date|null} 连接创建时间或null(如果连接不存在)  */ getConnectionTime(url) {  const conn = this.connections.get(url);  if (conn) {   return conn.createdAt;  } else {   return null;  } } /**  * 获取所有连接的创建时间  * @returns {Object} 包含所有连接创建时间的对象  */ getAllConnectionTimes() {  const times = {};  for (const [url, conn] of this.connections) {   times[url] = conn.createdAt;  }  return times; } /**  * 获取连接已运行时间(毫秒)  * @param {string} url - 连接URL  * @returns {number|null} 连接已运行时间(毫秒)或null(如果连接不存在)  */ getConnectionUptime(url) {  const conn = this.connections.get(url);  if (conn) {   return Date.now() - conn.createdAt.getTime();  } else {   return null;  } }}export default new WebSocketManager();

2. 上层 API 封装(websocket-service.js)

对底层管理器进行封装,提供更简洁、易用的上层 API,降低业务层使用成本。

/** * WebSocket 功能模块 - 统一入口 * 基于 WebSocketManager 封装的高级 API */import wsManager from "./websocket-manager.js";/** * 默认的 WebSocket 连接 URL * @type {string} */let currentWebSocketUrl = null;/** * 存储消息回调函数 * Map */const messageCallbacks = new Map();/** * 存储心跳消息回调函数 * Map */const heartbeatCallbacks = new Map();/** * 连接WebSocket * @param {string} [url] - WebSocket服务器地址,如果不提供则使用默认URL * @param {Object} [options] - 连接配置选项 * @param {Function} [onMessageCallback] - 接收消息时的回调函数 * @param {Function} [onHeartbeatCallback] - 接收心跳消息时的回调函数 * @returns {Object|null} 连接对象或null(连接失败时) */export function connectWebSocket(url, options = {}, onMessageCallback = null, onHeartbeatCallback = null) { try {  currentWebSocketUrl = url;  // 使用 wsManager.connect 方法连接,传递消息回调和心跳回调  const connection = wsManager.connect(url, options, onMessageCallback, onHeartbeatCallback);  // 如果提供了回调函数,也存储到本地映射  if (onMessageCallback) {   messageCallbacks.set(url, onMessageCallback);  }  if (onHeartbeatCallback) {   heartbeatCallbacks.set(url, onHeartbeatCallback);  }  if (connection) {   return connection;  } else {   return null;  } } catch (error) {  console.error("连接WebSocket失败:", error);  return null; }}/** * 断开WebSocket连接 * @param {string} [url] - WebSocket服务器地址,如果不提供则使用当前连接的URL * @returns {boolean} 是否成功断开 */export function disconnectWebSocket(url) { try {  const targetUrl = url || currentWebSocketUrl;  // 清除对应的消息回调和心跳回调  messageCallbacks.delete(targetUrl);  heartbeatCallbacks.delete(targetUrl);  wsManager.disconnect(targetUrl);  if (url === currentWebSocketUrl) {   currentWebSocketUrl = null;  }  return true; } catch (error) {  console.error("断开WebSocket失败:", error);  return false; }}/** * 断开所有WebSocket连接 * @returns {boolean} 是否成功断开所有连接 */export function disconnectAllWebSockets() { try {  wsManager.disconnectAll();  messageCallbacks.clear(); // 清除所有消息回调  heartbeatCallbacks.clear(); // 清除所有心跳回调  currentWebSocketUrl = null;  return true; } catch (error) {  console.error("断开所有WebSocket失败:", error);  return false; }}/** * 重新连接WebSocket * @param {string} [url] - WebSocket服务器地址,如果不提供则使用当前连接的URL * @param {Object} [options] - 连接配置选项 * @param {Function} [onMessageCallback] - 接收消息时的回调函数 * @param {Function} onHeartbeatCallback - 接收心跳消息时的回调函数 * @returns {Object|null} 连接对象或null(重新连接失败时) */export function reconnectWebSocket(url, options = {}, onMessageCallback = null, onHeartbeatCallback = null) { try {  const targetUrl = url || currentWebSocketUrl;  // 先断开现有连接  disconnectWebSocket(targetUrl);  // 重新连接,传递消息回调和心跳回调  return connectWebSocket(targetUrl, options, onMessageCallback, onHeartbeatCallback); } catch (error) {  console.error("重新连接WebSocket失败:", error);  return null; }}/** * 发送消息到WebSocket * @param {string} url - WebSocket服务器地址 * @param {any} message - 要发送的消息 * @returns {boolean} 消息是否发送成功 */export function sendWebSocketMessage(url, message) { try {  // 使用 wsManager.sendMessage 方法发送消息  const result = wsManager.sendMessage(url, message);  return result; } catch (error) {  console.error("发送WebSocket消息失败:", error);  return false; }}/** * 订阅消息接收 * @param {string} url - 连接URL * @param {Function} callback - 消息接收回调函数 * @returns {Function} 取消订阅的函数 */export function subscribeToMessages(url, callback) { try {  return wsManager.subscribeToMessages(url, callback); } catch (error) {  console.error("订阅消息接收失败:", error);  return () => {}; }}/** * 订阅心跳消息接收 * @param {string} url - 连接URL * @param {Function} callback - 心跳消息接收回调函数 * @returns {Function} 取消订阅的函数 */export function subscribeToHeartbeatMessages(url, callback) { try {  return wsManager.subscribeToHeartbeatMessages(url, callback); } catch (error) {  console.error("订阅心跳消息接收失败:", error);  return () => {}; }}/** * 获取指定连接的状态 * @param {string} url - WebSocket服务器地址 * @returns {string|undefined} 连接状态 */export function getConnectionStatus(url) { try {  return wsManager.getStatus(url); } catch (error) {  console.error("获取连接状态失败:", error);  return undefined; }}/** * 获取所有连接状态 * @returns {Object} 包含所有连接状态的对象 */export function getAllConnectionStatus() { try {  return wsManager.getAllStatus(); } catch (error) {  console.error("获取所有连接状态失败:", error);  return {}; }}/** * 检查指定连接是否已连接 * @param {string} url - WebSocket服务器地址 * @returns {boolean} 连接是否处于 OPEN 状态 */export function isWebSocketConnected(url) { try {  const status = wsManager.getStatus(url);  return status === "OPEN"; } catch (error) {  console.error("检查连接状态失败:", error);  return false; }}/** * 检查指定连接是否正在连接 * @param {string} url - WebSocket服务器地址 * @returns {boolean} 连接是否处于 CONNECTING 状态 */export function isWebSocketConnecting(url) { try {  const status = wsManager.getStatus(url);  return status === "CONNECTING"; } catch (error) {  console.error("检查连接状态失败:", error);  return false; }}/** * 获取当前活跃连接数 * @returns {number} 活跃连接数 */export function getActiveConnectionCount() { try {  return wsManager.getActiveConnectionCount(); } catch (error) {  console.error("获取活跃连接数失败:", error);  return 0; }}/** * 获取总连接数 * @returns {number} 总连接数 */export function getTotalConnectionCount() { try {  return wsManager.getTotalConnectionCount(); } catch (error) {  console.error("获取总连接数失败:", error);  return 0; }}/** * 检查是否存在活动连接 * @returns {boolean} 是否存在活动连接 */export function hasActiveConnections() { try {  return wsManager.hasActiveConnections(); } catch (error) {  console.error("检查活动连接失败:", error);  return false; }}/** * 监听连接状态变化 * @param {string} url - 连接URL * @param {Function} callback - 状态变化回调函数,接收 (newStatus, oldStatus, url) 参数 * @returns {Function} 取消监听的函数 */export function onConnectionStatusChange(url, callback) { if (typeof callback !== "function") {  console.warn("状态变化回调必须是函数");  return () => {}; } try {  return wsManager.subscribeToStatusChange(url, callback); } catch (error) {  console.error("监听连接状态变化失败:", error);  return () => {}; }}/** * 批量发送消息 * @param {Array<{url: string, message: any}>} messages - 消息数组 * @returns {Array<{url: string, success: boolean}>} 发送结果 */export function sendBatchMessages(messages) { try {  return messages.map(({ url, message }) => ({   url,   success: sendWebSocketMessage(url, message)  })); } catch (error) {  console.error("批量发送消息失败:", error);  return []; }}/** * 批量断开连接 * @param {Array} urls - 要断开的连接URL数组 * @returns {boolean} 操作是否成功 */export function disconnectBatchWebSockets(urls) { try {  // 清除对应的回调  urls.forEach(url => {   messageCallbacks.delete(url);   heartbeatCallbacks.delete(url);  });  wsManager.disconnectBatch(urls);  console.log(`批量断开 ${urls.length} 个 WebSocket 连接`);  return true; } catch (error) {  console.error("批量断开WebSocket失败:", error);  return false; }}/** * 获取当前使用的 WebSocket URL * @returns {string|null} 当前 WebSocket URL 或 null(如果没有连接) */export function getCurrentWebSocketUrl() { return currentWebSocketUrl;}/** * 获取存储的消息回调函数 * @param {string} url - WebSocket服务器地址 * @returns {Function|null} 消息回调函数或null */export function getMessageCallback(url) { return messageCallbacks.get(url) || null;}/** * 设置消息回调函数 * @param {string} url - WebSocket服务器地址 * @param {Function} callback - 消息回调函数 */export function setMessageCallback(url, callback) { if (typeof callback === "function") {  messageCallbacks.set(url, callback); } else {  console.warn("消息回调必须是函数"); }}/** * 获取存储的心跳消息回调函数 * @param {string} url - WebSocket服务器地址 * @returns {Function|null} 心跳消息回调函数或null */export function getHeartbeatCallback(url) { return heartbeatCallbacks.get(url) || null;}/** * 设置心跳消息回调函数 * @param {string} url - WebSocket服务器地址 * @param {Function} callback - 心跳消息回调函数 */export function setHeartbeatCallback(url, callback) { if (typeof callback === "function") {  heartbeatCallbacks.set(url, callback); } else {  console.warn("心跳消息回调必须是函数"); }}/** * 获取指定连接的创建时间 * @param {string} url - WebSocket服务器地址 * @returns {Date|null} 连接创建时间或null(如果连接不存在) */export function getConnectionTime(url) { try {  return wsManager.getConnectionTime(url); } catch (error) {  console.error("获取连接时间失败:", error);  return null; }}/** * 获取所有连接的创建时间 * @returns {Object} 包含所有连接创建时间的对象 */export function getAllConnectionTimes() { try {  return wsManager.getAllConnectionTimes(); } catch (error) {  console.error("获取所有连接时间失败:", error);  return {}; }}/** * 获取连接已运行时间(毫秒) * @param {string} url - WebSocket服务器地址 * @returns {number|null} 连接已运行时间(毫秒)或null(如果连接不存在) */export function getConnectionUptime(url) { try {  return wsManager.getConnectionUptime(url); } catch (error) {  console.error("获取连接运行时间失败:", error);  return null; }}

三、使用示例

1. 基础连接与消息接收

import {   connectWebSocket,   subscribeToMessages,   sendWebSocketMessage,  onConnectionStatusChange} from "./websocket-service.js";// 1. 连接WebSocketconst wsUrl = "ws://localhost:8080/ws";const connection = connectWebSocket(  wsUrl,  {    // 自定义配置(可选)    autoReconnect: { retries: 3 },    heartbeat: { interval: 8000 }  },  // 消息回调  (message) => {    console.log("收到消息:", message);  },  // 心跳回调  (heartbeatMsg) => {    console.log("收到心跳:", heartbeatMsg);  });// 2. 订阅消息(额外的订阅方式)const unsubscribe = subscribeToMessages(wsUrl, (message, url) => {  console.log(`从${url}收到消息:`, message);});// 3. 监听连接状态变化const unWatchStatus = onConnectionStatusChange(wsUrl, (newStatus, oldStatus, url) => {  console.log(`连接${url}状态变化: ${oldStatus} -> ${newStatus}`);  if (newStatus === "OPEN") {    console.log("连接已建立,可发送消息");  } else if (newStatus === "CLOSED") {    console.log("连接已断开");  }});// 4. 发送消息if (connection) {  const sendSuccess = sendWebSocketMessage(wsUrl, JSON.stringify({ type: "ping", data: "hello" }));  console.log("消息发送结果:", sendSuccess);}// 5. 取消订阅(组件卸载时)// unsubscribe();// unWatchStatus();// 6. 断开连接// disconnectWebSocket(wsUrl);

2. 批量操作

import { sendBatchMessages, disconnectBatchWebSockets } from "./websocket-service.js";// 批量发送消息const messages = [  { url: "ws://localhost:8080/ws1", message: "消息1" },  { url: "ws://localhost:8080/ws2", message: "消息2" }];const sendResults = sendBatchMessages(messages);console.log("批量发送结果:", sendResults);// 批量断开连接const urls = ["ws://localhost:8080/ws1", "ws://localhost:8080/ws2"];const disconnectSuccess = disconnectBatchWebSockets(urls);console.log("批量断开结果:", disconnectSuccess);

3. 组件中使用(Vue3)

[xss_clean]import { ref, onUnmounted } from "vue";import {   connectWebSocket,   disconnectWebSocket,   sendWebSocketMessage,  getConnectionStatus,  onConnectionStatusChange} from "./websocket-service.js";const wsUrl = "ws://localhost:8080/ws";const connectionStatus = ref("CLOSED");// 监听状态变化const unWatchStatus = onConnectionStatusChange(wsUrl, (newStatus) => {  connectionStatus.value = newStatus;});// 连接const connect = () => {  connectWebSocket(wsUrl, {}, (message) => {    console.log("收到消息:", message);  });  connectionStatus.value = getConnectionStatus(wsUrl);};// 断开const disconnect = () => {  disconnectWebSocket(wsUrl);  connectionStatus.value = getConnectionStatus(wsUrl);};// 发送消息const sendMessage = () => {  sendWebSocketMessage(wsUrl, "Hello WebSocket");};// 组件卸载时清理onUnmounted(() => {  unWatchStatus();  disconnectWebSocket(wsUrl);});[xss_clean]

四、封装亮点与优化点

亮点

  1. 单例模式:确保全局只有一个 WebSocket 管理器实例,避免重复创建
  2. 连接复用:相同 URL 的连接会复用,避免重复连接
  3. 分离关注点:底层管理器负责核心逻辑,上层 API 负责易用性封装
  4. 灵活的订阅机制:支持消息、心跳、状态的订阅 / 取消订阅,适配不同业务场景
  5. 完善的错误处理:每个核心方法都有 try-catch,保证程序健壮性
  6. 连接限制:防止过多连接导致的性能问题

可优化方向

  1. 连接池管理:实现连接的自动回收(如长时间闲置的连接自动断开)
  2. 消息队列:当连接未建立时,将消息加入队列,连接建立后自动发送
  3. 日志系统:集成更完善的日志记录,方便问题排查
  4. 类型支持:添加 TypeScript 类型定义,提升开发体验
  5. 断线重连策略优化:根据网络状态动态调整重连策略

五、总结

本文基于 VueUse 的useWebSocket钩子,封装了一套功能完善、易用性高的 WebSocket 管理方案,解决了原生 WebSocket 在实际开发中的诸多痛点。该方案具备单例管理、自动重连、心跳检测、多连接支持、事件订阅等核心能力,同时提供了简洁的上层 API,可直接应用于生产环境。

通过合理的分层设计(底层管理器 + 上层 API),既保证了核心逻辑的可维护性,又降低了业务层的使用成本。开发者可以根据实际业务需求,基于此封装扩展更多功能(如消息加密、数据解析、连接池等),满足不同场景的需求。

文章版权声明:除非注明,否则均为边学边练网络文章,版权归原作者所有