1. WebSocket基础概念
1.1 WebSocket协议简介
WebSocket是一种在单个TCP连接上进行全双工通信的协议: - 持久连接:建立连接后保持长期通信 - 双向通信:客户端和服务器都可以主动发送数据 - 低延迟:避免了HTTP请求/响应的开销 - 实时性:支持实时数据推送 - 协议升级:从HTTP协议升级而来
1.2 WebSocket在OpenResty中的应用
┌─────────────────────────────────────────────────────────┐
│ 客户端应用 │
│ ┌─────────────┬─────────────┬─────────────┬─────────┐ │
│ │ Web浏览器 │ 移动应用 │ 桌面应用 │ IoT设备 │ │
│ └─────────────┴─────────────┴─────────────┴─────────┘ │
└─────────────────┬───────────────────────────────────────┘
│ WebSocket连接
┌─────────────────▼───────────────────────────────────────┐
│ OpenResty WebSocket网关 │
│ ┌─────────────┬─────────────┬─────────────┬─────────┐ │
│ │ 连接管理 │ 消息路由 │ 认证鉴权 │ 负载均衡│ │
│ └─────────────┴─────────────┴─────────────┴─────────┘ │
└─────────┬───────────┬───────────┬───────────┬───────────┘
│ │ │ │
┌─────────▼─┐ ┌───────▼─┐ ┌───────▼─┐ ┌───────▼─┐
│ 聊天服务 │ │ 推送服务 │ │ 游戏服务 │ │ 监控服务 │
└───────────┘ └─────────┘ └─────────┘ └─────────┘
1.3 应用场景
- 即时通讯:聊天应用、在线客服
- 实时推送:消息通知、状态更新
- 在线游戏:多人游戏、实时对战
- 数据监控:实时图表、系统监控
- 协作工具:在线编辑、白板应用
- 金融交易:股票行情、交易数据
2. OpenResty WebSocket配置
2.1 基础配置
# nginx.conf - WebSocket配置
http {
# 共享内存配置
lua_shared_dict websocket_connections 100m;
lua_shared_dict websocket_rooms 50m;
lua_shared_dict websocket_users 50m;
# Lua包路径
lua_package_path "/usr/local/openresty/lualib/websocket/?.lua;;";
# 初始化
init_by_lua_block {
require "websocket.init"
}
init_worker_by_lua_block {
require "websocket.worker"
}
# WebSocket服务器
server {
listen 8080;
server_name ws.example.com;
# WebSocket升级处理
location /ws {
# 设置WebSocket升级头
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket处理
content_by_lua_block {
local websocket_handler = require "websocket.handler"
websocket_handler.handle_connection()
}
}
# 聊天室WebSocket
location /ws/chat {
content_by_lua_block {
local chat_handler = require "websocket.chat"
chat_handler.handle_chat_connection()
}
}
# 推送服务WebSocket
location /ws/push {
content_by_lua_block {
local push_handler = require "websocket.push"
push_handler.handle_push_connection()
}
}
# 游戏服务WebSocket
location /ws/game {
content_by_lua_block {
local game_handler = require "websocket.game"
game_handler.handle_game_connection()
}
}
# WebSocket管理接口
location /api/ws/ {
content_by_lua_block {
local ws_admin = require "websocket.admin"
ws_admin.handle_request()
}
}
# 静态文件服务
location / {
root /var/www/websocket;
index index.html;
}
}
}
2.2 WebSocket核心处理模块
-- websocket/handler.lua - WebSocket核心处理模块
local handler = {}
local server = require "resty.websocket.server"
local cjson = require "cjson"
local auth = require "websocket.auth"
local connection_manager = require "websocket.connection_manager"
local message_router = require "websocket.message_router"
local logger = require "websocket.logger"
-- 处理WebSocket连接
function handler.handle_connection()
-- 创建WebSocket服务器实例
local wb, err = server:new({
timeout = 5000, -- 5秒超时
max_payload_len = 65535, -- 最大消息长度
})
if not wb then
ngx.log(ngx.ERR, "Failed to create WebSocket server: ", err)
return ngx.exit(444)
end
-- 生成连接ID
local connection_id = string.format("%s-%d-%d",
ngx.var.remote_addr, ngx.time(), math.random(10000, 99999))
ngx.log(ngx.INFO, "New WebSocket connection: ", connection_id)
-- 认证处理
local auth_result = auth.authenticate_websocket()
if not auth_result.success then
local close_frame = {
code = 1008, -- Policy Violation
reason = auth_result.error or "Authentication failed"
}
wb:send_close(close_frame.code, close_frame.reason)
return
end
local user = auth_result.user
-- 注册连接
local connection_info = {
connection_id = connection_id,
user_id = user.user_id,
username = user.username,
remote_addr = ngx.var.remote_addr,
user_agent = ngx.var.http_user_agent,
connect_time = ngx.time(),
last_ping = ngx.time(),
status = "connected"
}
connection_manager.register_connection(connection_info)
-- 发送连接确认消息
local welcome_message = {
type = "connection",
status = "connected",
connection_id = connection_id,
user = {
user_id = user.user_id,
username = user.username
},
timestamp = ngx.time()
}
local ok, err = wb:send_text(cjson.encode(welcome_message))
if not ok then
ngx.log(ngx.ERR, "Failed to send welcome message: ", err)
connection_manager.unregister_connection(connection_id)
return
end
-- 消息处理循环
while true do
local data, typ, err = wb:recv_frame()
if not data then
if err then
ngx.log(ngx.WARN, "WebSocket receive error: ", err)
end
break
end
-- 更新最后活动时间
connection_manager.update_last_activity(connection_id)
if typ == "close" then
ngx.log(ngx.INFO, "WebSocket connection closed by client: ", connection_id)
break
elseif typ == "ping" then
-- 响应ping
local ok, err = wb:send_pong(data)
if not ok then
ngx.log(ngx.ERR, "Failed to send pong: ", err)
break
end
elseif typ == "pong" then
-- 处理pong响应
ngx.log(ngx.DEBUG, "Received pong from client: ", connection_id)
elseif typ == "text" then
-- 处理文本消息
local success, message = pcall(cjson.decode, data)
if success then
handler.process_message(wb, connection_info, message)
else
ngx.log(ngx.WARN, "Invalid JSON message from ", connection_id, ": ", data)
local error_response = {
type = "error",
error = "Invalid JSON format",
timestamp = ngx.time()
}
wb:send_text(cjson.encode(error_response))
end
elseif typ == "binary" then
-- 处理二进制消息
handler.process_binary_message(wb, connection_info, data)
end
end
-- 清理连接
connection_manager.unregister_connection(connection_id)
ngx.log(ngx.INFO, "WebSocket connection closed: ", connection_id)
end
-- 处理文本消息
function handler.process_message(wb, connection_info, message)
if not message.type then
local error_response = {
type = "error",
error = "Message type is required",
timestamp = ngx.time()
}
wb:send_text(cjson.encode(error_response))
return
end
-- 记录消息
logger.log_message({
connection_id = connection_info.connection_id,
user_id = connection_info.user_id,
message_type = message.type,
message = message,
timestamp = ngx.time()
})
-- 路由消息
local response = message_router.route_message(connection_info, message)
if response then
local ok, err = wb:send_text(cjson.encode(response))
if not ok then
ngx.log(ngx.ERR, "Failed to send response: ", err)
end
end
end
-- 处理二进制消息
function handler.process_binary_message(wb, connection_info, data)
-- 处理二进制数据(如文件上传、图片等)
ngx.log(ngx.INFO, "Received binary message from ", connection_info.connection_id,
", size: ", #data)
-- 这里可以根据需要处理二进制数据
-- 例如:文件上传、图片传输等
local response = {
type = "binary_ack",
size = #data,
timestamp = ngx.time()
}
wb:send_text(cjson.encode(response))
end
-- 发送消息到指定连接
function handler.send_message_to_connection(connection_id, message)
local connection = connection_manager.get_connection(connection_id)
if not connection then
return false, "Connection not found"
end
-- 这里需要实现向特定连接发送消息的逻辑
-- 由于WebSocket连接是在协程中处理的,需要使用共享内存或消息队列
return true
end
-- 广播消息到所有连接
function handler.broadcast_message(message, filter)
local connections = connection_manager.get_all_connections()
local sent_count = 0
for connection_id, connection in pairs(connections) do
local should_send = true
-- 应用过滤器
if filter then
should_send = filter(connection)
end
if should_send then
local success = handler.send_message_to_connection(connection_id, message)
if success then
sent_count = sent_count + 1
end
end
end
return sent_count
end
return handler
3. 连接管理
3.1 连接管理器
-- websocket/connection_manager.lua - 连接管理模块
local connection_manager = {}
local cjson = require "cjson"
-- 注册连接
function connection_manager.register_connection(connection_info)
local cache = ngx.shared.websocket_connections
if not cache then
return false, "Shared memory not available"
end
local connection_data = cjson.encode(connection_info)
local ok, err = cache:set(connection_info.connection_id, connection_data, 3600)
if not ok then
ngx.log(ngx.ERR, "Failed to register connection: ", err)
return false, err
end
-- 按用户索引连接
local user_connections_key = "user:" .. connection_info.user_id
local user_connections = cache:get(user_connections_key)
local connections_list = user_connections and cjson.decode(user_connections) or {}
table.insert(connections_list, connection_info.connection_id)
cache:set(user_connections_key, cjson.encode(connections_list), 3600)
-- 更新连接统计
cache:incr("stats:total_connections", 1, 0)
cache:incr("stats:user:" .. connection_info.user_id .. ":connections", 1, 0)
ngx.log(ngx.INFO, "Connection registered: ", connection_info.connection_id)
return true
end
-- 注销连接
function connection_manager.unregister_connection(connection_id)
local cache = ngx.shared.websocket_connections
if not cache then
return false, "Shared memory not available"
end
-- 获取连接信息
local connection_data = cache:get(connection_id)
if not connection_data then
return false, "Connection not found"
end
local connection_info = cjson.decode(connection_data)
-- 删除连接
cache:delete(connection_id)
-- 从用户连接列表中移除
local user_connections_key = "user:" .. connection_info.user_id
local user_connections = cache:get(user_connections_key)
if user_connections then
local connections_list = cjson.decode(user_connections)
local new_list = {}
for _, conn_id in ipairs(connections_list) do
if conn_id ~= connection_id then
table.insert(new_list, conn_id)
end
end
if #new_list > 0 then
cache:set(user_connections_key, cjson.encode(new_list), 3600)
else
cache:delete(user_connections_key)
end
end
-- 更新连接统计
cache:incr("stats:total_connections", -1, 0)
cache:incr("stats:user:" .. connection_info.user_id .. ":connections", -1, 0)
ngx.log(ngx.INFO, "Connection unregistered: ", connection_id)
return true
end
-- 获取连接信息
function connection_manager.get_connection(connection_id)
local cache = ngx.shared.websocket_connections
if not cache then
return nil
end
local connection_data = cache:get(connection_id)
if connection_data then
return cjson.decode(connection_data)
end
return nil
end
-- 获取用户的所有连接
function connection_manager.get_user_connections(user_id)
local cache = ngx.shared.websocket_connections
if not cache then
return {}
end
local user_connections_key = "user:" .. user_id
local user_connections = cache:get(user_connections_key)
if user_connections then
local connections_list = cjson.decode(user_connections)
local connections = {}
for _, connection_id in ipairs(connections_list) do
local connection = connection_manager.get_connection(connection_id)
if connection then
connections[connection_id] = connection
end
end
return connections
end
return {}
end
-- 获取所有连接
function connection_manager.get_all_connections()
local cache = ngx.shared.websocket_connections
if not cache then
return {}
end
local connections = {}
local keys = cache:get_keys(0) -- 获取所有键
for _, key in ipairs(keys) do
if not string.match(key, "^user:") and not string.match(key, "^stats:") then
local connection_data = cache:get(key)
if connection_data then
connections[key] = cjson.decode(connection_data)
end
end
end
return connections
end
-- 更新连接最后活动时间
function connection_manager.update_last_activity(connection_id)
local cache = ngx.shared.websocket_connections
if not cache then
return false
end
local connection_data = cache:get(connection_id)
if connection_data then
local connection_info = cjson.decode(connection_data)
connection_info.last_ping = ngx.time()
cache:set(connection_id, cjson.encode(connection_info), 3600)
return true
end
return false
end
-- 获取连接统计信息
function connection_manager.get_connection_stats()
local cache = ngx.shared.websocket_connections
if not cache then
return {error = "Shared memory not available"}
end
local total_connections = cache:get("stats:total_connections") or 0
local connections = connection_manager.get_all_connections()
local stats = {
total_connections = total_connections,
active_connections = 0,
users_online = 0,
connections_by_user = {},
timestamp = ngx.time()
}
local users = {}
for connection_id, connection in pairs(connections) do
stats.active_connections = stats.active_connections + 1
if not users[connection.user_id] then
users[connection.user_id] = 0
stats.users_online = stats.users_online + 1
end
users[connection.user_id] = users[connection.user_id] + 1
end
stats.connections_by_user = users
return stats
end
-- 清理过期连接
function connection_manager.cleanup_expired_connections()
local cache = ngx.shared.websocket_connections
if not cache then
return 0
end
local current_time = ngx.time()
local timeout = 300 -- 5分钟超时
local cleaned_count = 0
local connections = connection_manager.get_all_connections()
for connection_id, connection in pairs(connections) do
if current_time - connection.last_ping > timeout then
connection_manager.unregister_connection(connection_id)
cleaned_count = cleaned_count + 1
ngx.log(ngx.INFO, "Cleaned expired connection: ", connection_id)
end
end
return cleaned_count
end
-- 启动清理定时器
function connection_manager.start_cleanup_timer()
local function cleanup_timer(premature)
if premature then
return
end
local cleaned = connection_manager.cleanup_expired_connections()
if cleaned > 0 then
ngx.log(ngx.INFO, "Cleaned ", cleaned, " expired connections")
end
-- 重新设置定时器(每5分钟清理一次)
local ok, err = ngx.timer.at(300, cleanup_timer)
if not ok then
ngx.log(ngx.ERR, "Failed to create cleanup timer: ", err)
end
end
local ok, err = ngx.timer.at(300, cleanup_timer)
if not ok then
ngx.log(ngx.ERR, "Failed to create initial cleanup timer: ", err)
end
end
return connection_manager
3.2 消息路由器
-- websocket/message_router.lua - 消息路由模块
local message_router = {}
local cjson = require "cjson"
local connection_manager = require "websocket.connection_manager"
-- 消息处理器注册表
local message_handlers = {}
-- 注册消息处理器
function message_router.register_handler(message_type, handler)
message_handlers[message_type] = handler
end
-- 路由消息
function message_router.route_message(connection_info, message)
local message_type = message.type
local handler = message_handlers[message_type]
if handler then
return handler(connection_info, message)
else
ngx.log(ngx.WARN, "No handler found for message type: ", message_type)
return {
type = "error",
error = "Unknown message type: " .. message_type,
timestamp = ngx.time()
}
end
end
-- 默认消息处理器
-- Ping处理器
message_router.register_handler("ping", function(connection_info, message)
return {
type = "pong",
timestamp = ngx.time(),
server_time = ngx.time()
}
end)
-- 心跳处理器
message_router.register_handler("heartbeat", function(connection_info, message)
connection_manager.update_last_activity(connection_info.connection_id)
return {
type = "heartbeat_ack",
timestamp = ngx.time()
}
end)
-- 获取在线用户列表
message_router.register_handler("get_online_users", function(connection_info, message)
local stats = connection_manager.get_connection_stats()
local online_users = {}
for user_id, connection_count in pairs(stats.connections_by_user) do
table.insert(online_users, {
user_id = user_id,
connection_count = connection_count
})
end
return {
type = "online_users",
users = online_users,
total_users = stats.users_online,
timestamp = ngx.time()
}
end)
-- 私聊消息处理器
message_router.register_handler("private_message", function(connection_info, message)
local target_user_id = message.target_user_id
local content = message.content
if not target_user_id or not content then
return {
type = "error",
error = "target_user_id and content are required",
timestamp = ngx.time()
}
end
-- 获取目标用户的连接
local target_connections = connection_manager.get_user_connections(target_user_id)
if next(target_connections) == nil then
return {
type = "error",
error = "Target user is not online",
timestamp = ngx.time()
}
end
-- 构造消息
local private_message = {
type = "private_message",
from_user_id = connection_info.user_id,
from_username = connection_info.username,
content = content,
timestamp = ngx.time()
}
-- 发送给目标用户的所有连接
local sent_count = 0
for connection_id, _ in pairs(target_connections) do
-- 这里需要实现向特定连接发送消息的逻辑
-- 由于WebSocket连接是在协程中处理的,需要使用消息队列
message_router.queue_message(connection_id, private_message)
sent_count = sent_count + 1
end
return {
type = "message_sent",
target_user_id = target_user_id,
sent_to_connections = sent_count,
timestamp = ngx.time()
}
end)
-- 广播消息处理器
message_router.register_handler("broadcast", function(connection_info, message)
local content = message.content
if not content then
return {
type = "error",
error = "content is required",
timestamp = ngx.time()
}
end
-- 检查权限(只有管理员可以广播)
if connection_info.role ~= "admin" then
return {
type = "error",
error = "Permission denied",
timestamp = ngx.time()
}
end
-- 构造广播消息
local broadcast_message = {
type = "broadcast",
from_user_id = connection_info.user_id,
from_username = connection_info.username,
content = content,
timestamp = ngx.time()
}
-- 广播给所有连接
local connections = connection_manager.get_all_connections()
local sent_count = 0
for connection_id, _ in pairs(connections) do
if connection_id ~= connection_info.connection_id then -- 不发送给自己
message_router.queue_message(connection_id, broadcast_message)
sent_count = sent_count + 1
end
end
return {
type = "broadcast_sent",
sent_to_connections = sent_count,
timestamp = ngx.time()
}
end)
-- 加入房间处理器
message_router.register_handler("join_room", function(connection_info, message)
local room_id = message.room_id
if not room_id then
return {
type = "error",
error = "room_id is required",
timestamp = ngx.time()
}
end
local room_manager = require "websocket.room_manager"
local success, err = room_manager.join_room(room_id, connection_info.connection_id, connection_info.user_id)
if success then
-- 通知房间内其他用户
local join_notification = {
type = "user_joined",
room_id = room_id,
user_id = connection_info.user_id,
username = connection_info.username,
timestamp = ngx.time()
}
room_manager.broadcast_to_room(room_id, join_notification, connection_info.connection_id)
return {
type = "room_joined",
room_id = room_id,
timestamp = ngx.time()
}
else
return {
type = "error",
error = err or "Failed to join room",
timestamp = ngx.time()
}
end
end)
-- 离开房间处理器
message_router.register_handler("leave_room", function(connection_info, message)
local room_id = message.room_id
if not room_id then
return {
type = "error",
error = "room_id is required",
timestamp = ngx.time()
}
end
local room_manager = require "websocket.room_manager"
local success, err = room_manager.leave_room(room_id, connection_info.connection_id)
if success then
-- 通知房间内其他用户
local leave_notification = {
type = "user_left",
room_id = room_id,
user_id = connection_info.user_id,
username = connection_info.username,
timestamp = ngx.time()
}
room_manager.broadcast_to_room(room_id, leave_notification)
return {
type = "room_left",
room_id = room_id,
timestamp = ngx.time()
}
else
return {
type = "error",
error = err or "Failed to leave room",
timestamp = ngx.time()
}
end
end)
-- 房间消息处理器
message_router.register_handler("room_message", function(connection_info, message)
local room_id = message.room_id
local content = message.content
if not room_id or not content then
return {
type = "error",
error = "room_id and content are required",
timestamp = ngx.time()
}
end
local room_manager = require "websocket.room_manager"
-- 检查用户是否在房间中
if not room_manager.is_user_in_room(room_id, connection_info.connection_id) then
return {
type = "error",
error = "You are not in this room",
timestamp = ngx.time()
}
end
-- 构造房间消息
local room_message = {
type = "room_message",
room_id = room_id,
from_user_id = connection_info.user_id,
from_username = connection_info.username,
content = content,
timestamp = ngx.time()
}
-- 广播给房间内所有用户
local sent_count = room_manager.broadcast_to_room(room_id, room_message, connection_info.connection_id)
return {
type = "message_sent",
room_id = room_id,
sent_to_connections = sent_count,
timestamp = ngx.time()
}
end)
-- 消息队列(用于跨协程发送消息)
function message_router.queue_message(connection_id, message)
local cache = ngx.shared.websocket_connections
if not cache then
return false
end
local queue_key = "queue:" .. connection_id
local queue_data = cache:get(queue_key)
local queue = queue_data and cjson.decode(queue_data) or {}
table.insert(queue, {
message = message,
timestamp = ngx.time()
})
-- 限制队列长度
if #queue > 100 then
table.remove(queue, 1) -- 移除最旧的消息
end
cache:set(queue_key, cjson.encode(queue), 300) -- 5分钟过期
return true
end
-- 获取队列中的消息
function message_router.get_queued_messages(connection_id)
local cache = ngx.shared.websocket_connections
if not cache then
return {}
end
local queue_key = "queue:" .. connection_id
local queue_data = cache:get(queue_key)
if queue_data then
local queue = cjson.decode(queue_data)
cache:delete(queue_key) -- 清空队列
return queue
end
return {}
end
return message_router
4. 房间管理
4.1 房间管理器
-- websocket/room_manager.lua - 房间管理模块
local room_manager = {}
local cjson = require "cjson"
local connection_manager = require "websocket.connection_manager"
-- 创建房间
function room_manager.create_room(room_id, creator_user_id, room_config)
local cache = ngx.shared.websocket_rooms
if not cache then
return false, "Shared memory not available"
end
-- 检查房间是否已存在
local existing_room = cache:get(room_id)
if existing_room then
return false, "Room already exists"
end
local room_info = {
room_id = room_id,
creator_user_id = creator_user_id,
created_time = ngx.time(),
max_users = room_config and room_config.max_users or 100,
is_private = room_config and room_config.is_private or false,
password = room_config and room_config.password,
description = room_config and room_config.description or "",
members = {},
member_count = 0
}
local ok, err = cache:set(room_id, cjson.encode(room_info), 3600)
if not ok then
return false, err
end
ngx.log(ngx.INFO, "Room created: ", room_id, " by user: ", creator_user_id)
return true
end
-- 删除房间
function room_manager.delete_room(room_id, user_id)
local cache = ngx.shared.websocket_rooms
if not cache then
return false, "Shared memory not available"
end
local room_data = cache:get(room_id)
if not room_data then
return false, "Room not found"
end
local room_info = cjson.decode(room_data)
-- 检查权限(只有创建者可以删除房间)
if room_info.creator_user_id ~= user_id then
return false, "Permission denied"
end
-- 通知所有房间成员
local delete_notification = {
type = "room_deleted",
room_id = room_id,
timestamp = ngx.time()
}
room_manager.broadcast_to_room(room_id, delete_notification)
-- 删除房间
cache:delete(room_id)
ngx.log(ngx.INFO, "Room deleted: ", room_id)
return true
end
-- 加入房间
function room_manager.join_room(room_id, connection_id, user_id, password)
local cache = ngx.shared.websocket_rooms
if not cache then
return false, "Shared memory not available"
end
local room_data = cache:get(room_id)
if not room_data then
return false, "Room not found"
end
local room_info = cjson.decode(room_data)
-- 检查房间是否已满
if room_info.member_count >= room_info.max_users then
return false, "Room is full"
end
-- 检查密码(如果是私有房间)
if room_info.is_private and room_info.password then
if password ~= room_info.password then
return false, "Invalid password"
end
end
-- 检查用户是否已在房间中
if room_info.members[connection_id] then
return false, "Already in room"
end
-- 添加成员
room_info.members[connection_id] = {
user_id = user_id,
join_time = ngx.time()
}
room_info.member_count = room_info.member_count + 1
local ok, err = cache:set(room_id, cjson.encode(room_info), 3600)
if not ok then
return false, err
end
ngx.log(ngx.INFO, "User ", user_id, " joined room: ", room_id)
return true
end
-- 离开房间
function room_manager.leave_room(room_id, connection_id)
local cache = ngx.shared.websocket_rooms
if not cache then
return false, "Shared memory not available"
end
local room_data = cache:get(room_id)
if not room_data then
return false, "Room not found"
end
local room_info = cjson.decode(room_data)
-- 检查用户是否在房间中
if not room_info.members[connection_id] then
return false, "Not in room"
end
local user_id = room_info.members[connection_id].user_id
-- 移除成员
room_info.members[connection_id] = nil
room_info.member_count = room_info.member_count - 1
-- 如果房间为空且不是永久房间,则删除房间
if room_info.member_count == 0 and not room_info.is_permanent then
cache:delete(room_id)
ngx.log(ngx.INFO, "Empty room deleted: ", room_id)
else
cache:set(room_id, cjson.encode(room_info), 3600)
end
ngx.log(ngx.INFO, "User ", user_id, " left room: ", room_id)
return true
end
-- 检查用户是否在房间中
function room_manager.is_user_in_room(room_id, connection_id)
local cache = ngx.shared.websocket_rooms
if not cache then
return false
end
local room_data = cache:get(room_id)
if not room_data then
return false
end
local room_info = cjson.decode(room_data)
return room_info.members[connection_id] ~= nil
end
-- 获取房间信息
function room_manager.get_room_info(room_id)
local cache = ngx.shared.websocket_rooms
if not cache then
return nil
end
local room_data = cache:get(room_id)
if room_data then
return cjson.decode(room_data)
end
return nil
end
-- 获取房间成员列表
function room_manager.get_room_members(room_id)
local room_info = room_manager.get_room_info(room_id)
if not room_info then
return {}
end
local members = {}
for connection_id, member_info in pairs(room_info.members) do
local connection = connection_manager.get_connection(connection_id)
if connection then
table.insert(members, {
connection_id = connection_id,
user_id = member_info.user_id,
username = connection.username,
join_time = member_info.join_time
})
end
end
return members
end
-- 向房间广播消息
function room_manager.broadcast_to_room(room_id, message, exclude_connection_id)
local room_info = room_manager.get_room_info(room_id)
if not room_info then
return 0
end
local sent_count = 0
local message_router = require "websocket.message_router"
for connection_id, _ in pairs(room_info.members) do
if connection_id ~= exclude_connection_id then
message_router.queue_message(connection_id, message)
sent_count = sent_count + 1
end
end
return sent_count
end
-- 获取所有房间列表
function room_manager.get_all_rooms()
local cache = ngx.shared.websocket_rooms
if not cache then
return {}
end
local rooms = {}
local keys = cache:get_keys(0)
for _, room_id in ipairs(keys) do
local room_data = cache:get(room_id)
if room_data then
local room_info = cjson.decode(room_data)
-- 只返回公开房间的基本信息
if not room_info.is_private then
table.insert(rooms, {
room_id = room_id,
description = room_info.description,
member_count = room_info.member_count,
max_users = room_info.max_users,
created_time = room_info.created_time
})
end
end
end
return rooms
end
-- 获取房间统计信息
function room_manager.get_room_stats()
local cache = ngx.shared.websocket_rooms
if not cache then
return {error = "Shared memory not available"}
end
local stats = {
total_rooms = 0,
public_rooms = 0,
private_rooms = 0,
total_members = 0,
rooms = {},
timestamp = ngx.time()
}
local keys = cache:get_keys(0)
for _, room_id in ipairs(keys) do
local room_data = cache:get(room_id)
if room_data then
local room_info = cjson.decode(room_data)
stats.total_rooms = stats.total_rooms + 1
stats.total_members = stats.total_members + room_info.member_count
if room_info.is_private then
stats.private_rooms = stats.private_rooms + 1
else
stats.public_rooms = stats.public_rooms + 1
end
stats.rooms[room_id] = {
member_count = room_info.member_count,
max_users = room_info.max_users,
is_private = room_info.is_private,
created_time = room_info.created_time
}
end
end
return stats
end
return room_manager
5. 实时应用示例
5.1 聊天应用
-- websocket/chat.lua - 聊天应用模块
local chat = {}
local server = require "resty.websocket.server"
local cjson = require "cjson"
local connection_manager = require "websocket.connection_manager"
local room_manager = require "websocket.room_manager"
local message_router = require "websocket.message_router"
-- 处理聊天连接
function chat.handle_chat_connection()
local wb, err = server:new({
timeout = 5000,
max_payload_len = 65535,
})
if not wb then
ngx.log(ngx.ERR, "Failed to create WebSocket server: ", err)
return ngx.exit(444)
end
-- 认证和连接注册(复用之前的逻辑)
local connection_id = string.format("chat-%s-%d-%d",
ngx.var.remote_addr, ngx.time(), math.random(10000, 99999))
-- 注册聊天特定的消息处理器
chat.register_chat_handlers()
-- 消息处理循环
while true do
local data, typ, err = wb:recv_frame()
if not data then
break
end
if typ == "text" then
local success, message = pcall(cjson.decode, data)
if success then
local response = message_router.route_message(connection_info, message)
if response then
wb:send_text(cjson.encode(response))
end
end
end
end
end
-- 注册聊天特定的消息处理器
function chat.register_chat_handlers()
-- 发送表情消息
message_router.register_handler("emoji_message", function(connection_info, message)
local room_id = message.room_id
local emoji = message.emoji
if not room_id or not emoji then
return {
type = "error",
error = "room_id and emoji are required",
timestamp = ngx.time()
}
end
local emoji_message = {
type = "emoji_message",
room_id = room_id,
from_user_id = connection_info.user_id,
from_username = connection_info.username,
emoji = emoji,
timestamp = ngx.time()
}
local sent_count = room_manager.broadcast_to_room(room_id, emoji_message, connection_info.connection_id)
return {
type = "message_sent",
sent_to_connections = sent_count,
timestamp = ngx.time()
}
end)
-- 文件分享消息
message_router.register_handler("file_message", function(connection_info, message)
local room_id = message.room_id
local file_info = message.file_info
if not room_id or not file_info then
return {
type = "error",
error = "room_id and file_info are required",
timestamp = ngx.time()
}
end
local file_message = {
type = "file_message",
room_id = room_id,
from_user_id = connection_info.user_id,
from_username = connection_info.username,
file_info = file_info,
timestamp = ngx.time()
}
local sent_count = room_manager.broadcast_to_room(room_id, file_message, connection_info.connection_id)
return {
type = "message_sent",
sent_to_connections = sent_count,
timestamp = ngx.time()
}
end)
-- 正在输入状态
message_router.register_handler("typing_status", function(connection_info, message)
local room_id = message.room_id
local is_typing = message.is_typing
if not room_id then
return {
type = "error",
error = "room_id is required",
timestamp = ngx.time()
}
end
local typing_message = {
type = "typing_status",
room_id = room_id,
user_id = connection_info.user_id,
username = connection_info.username,
is_typing = is_typing,
timestamp = ngx.time()
}
room_manager.broadcast_to_room(room_id, typing_message, connection_info.connection_id)
return {
type = "typing_status_sent",
timestamp = ngx.time()
}
end)
end
return chat
5.2 实时推送服务
-- websocket/push.lua - 推送服务模块
local push = {}
local server = require "resty.websocket.server"
local cjson = require "cjson"
local redis = require "resty.redis"
local connection_manager = require "websocket.connection_manager"
-- 处理推送连接
function push.handle_push_connection()
local wb, err = server:new({
timeout = 5000,
max_payload_len = 65535,
})
if not wb then
ngx.log(ngx.ERR, "Failed to create WebSocket server: ", err)
return ngx.exit(444)
end
-- 连接认证和注册
local connection_id = string.format("push-%s-%d-%d",
ngx.var.remote_addr, ngx.time(), math.random(10000, 99999))
-- 订阅用户感兴趣的推送频道
push.subscribe_user_channels(connection_info)
-- 启动推送消息检查定时器
push.start_push_timer(connection_id, wb)
-- 消息处理循环
while true do
local data, typ, err = wb:recv_frame()
if not data then
break
end
if typ == "text" then
local success, message = pcall(cjson.decode, data)
if success then
push.handle_push_message(connection_info, message, wb)
end
end
end
end
-- 订阅用户频道
function push.subscribe_user_channels(connection_info)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.log(ngx.ERR, "Failed to connect to Redis: ", err)
return
end
-- 订阅用户个人频道
red:subscribe("user:" .. connection_info.user_id)
-- 订阅用户角色频道
if connection_info.role then
red:subscribe("role:" .. connection_info.role)
end
-- 订阅全局频道
red:subscribe("global")
red:set_keepalive(10000, 100)
end
-- 启动推送定时器
function push.start_push_timer(connection_id, wb)
local function push_timer(premature)
if premature then
return
end
-- 检查是否有待推送的消息
local messages = push.get_pending_messages(connection_id)
for _, message in ipairs(messages) do
local ok, err = wb:send_text(cjson.encode(message))
if not ok then
ngx.log(ngx.ERR, "Failed to send push message: ", err)
return
end
end
-- 重新设置定时器
local ok, err = ngx.timer.at(1, push_timer) -- 每秒检查一次
if not ok then
ngx.log(ngx.ERR, "Failed to create push timer: ", err)
end
end
local ok, err = ngx.timer.at(1, push_timer)
if not ok then
ngx.log(ngx.ERR, "Failed to create initial push timer: ", err)
end
end
-- 获取待推送消息
function push.get_pending_messages(connection_id)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return {}
end
local messages = {}
local queue_key = "push_queue:" .. connection_id
-- 获取队列中的所有消息
while true do
local message_data = red:lpop(queue_key)
if not message_data or message_data == ngx.null then
break
end
local success, message = pcall(cjson.decode, message_data)
if success then
table.insert(messages, message)
end
end
red:set_keepalive(10000, 100)
return messages
end
-- 处理推送消息
function push.handle_push_message(connection_info, message, wb)
if message.type == "subscribe" then
-- 订阅新频道
local channel = message.channel
if channel then
push.subscribe_channel(connection_info.connection_id, channel)
local response = {
type = "subscribed",
channel = channel,
timestamp = ngx.time()
}
wb:send_text(cjson.encode(response))
end
elseif message.type == "unsubscribe" then
-- 取消订阅频道
local channel = message.channel
if channel then
push.unsubscribe_channel(connection_info.connection_id, channel)
local response = {
type = "unsubscribed",
channel = channel,
timestamp = ngx.time()
}
wb:send_text(cjson.encode(response))
end
end
end
-- 发送推送消息到用户
function push.send_to_user(user_id, message)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return false, err
end
-- 获取用户的所有连接
local user_connections = connection_manager.get_user_connections(user_id)
for connection_id, _ in pairs(user_connections) do
local queue_key = "push_queue:" .. connection_id
red:rpush(queue_key, cjson.encode(message))
red:expire(queue_key, 300) -- 5分钟过期
end
red:set_keepalive(10000, 100)
return true
end
-- 发送推送消息到频道
function push.send_to_channel(channel, message)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return false, err
end
-- 发布到Redis频道
red:publish(channel, cjson.encode(message))
red:set_keepalive(10000, 100)
return true
end
-- 全局广播
function push.broadcast_global(message)
return push.send_to_channel("global", message)
end
return push
5.3 在线游戏服务
-- websocket/game.lua - 游戏服务模块
local game = {}
local server = require "resty.websocket.server"
local cjson = require "cjson"
local connection_manager = require "websocket.connection_manager"
local room_manager = require "websocket.room_manager"
-- 游戏状态管理
local game_rooms = {}
-- 处理游戏连接
function game.handle_game_connection()
local wb, err = server:new({
timeout = 5000,
max_payload_len = 65535,
})
if not wb then
ngx.log(ngx.ERR, "Failed to create WebSocket server: ", err)
return ngx.exit(444)
end
-- 注册游戏特定的消息处理器
game.register_game_handlers()
-- 消息处理循环
while true do
local data, typ, err = wb:recv_frame()
if not data then
break
end
if typ == "text" then
local success, message = pcall(cjson.decode, data)
if success then
game.handle_game_message(connection_info, message, wb)
end
end
end
end
-- 注册游戏消息处理器
function game.register_game_handlers()
-- 创建游戏房间
message_router.register_handler("create_game", function(connection_info, message)
local game_type = message.game_type
local max_players = message.max_players or 2
local game_room_id = "game_" .. ngx.time() .. "_" .. math.random(1000, 9999)
local game_config = {
max_users = max_players,
is_private = false,
description = "Game Room: " .. game_type
}
local success, err = room_manager.create_room(game_room_id, connection_info.user_id, game_config)
if success then
-- 初始化游戏状态
game_rooms[game_room_id] = {
game_type = game_type,
max_players = max_players,
current_players = 0,
game_state = "waiting",
players = {},
created_time = ngx.time()
}
return {
type = "game_created",
game_room_id = game_room_id,
game_type = game_type,
max_players = max_players,
timestamp = ngx.time()
}
else
return {
type = "error",
error = err or "Failed to create game",
timestamp = ngx.time()
}
end
end)
-- 加入游戏
message_router.register_handler("join_game", function(connection_info, message)
local game_room_id = message.game_room_id
if not game_room_id then
return {
type = "error",
error = "game_room_id is required",
timestamp = ngx.time()
}
end
local game_room = game_rooms[game_room_id]
if not game_room then
return {
type = "error",
error = "Game room not found",
timestamp = ngx.time()
}
end
if game_room.current_players >= game_room.max_players then
return {
type = "error",
error = "Game room is full",
timestamp = ngx.time()
}
end
-- 加入房间
local success, err = room_manager.join_room(game_room_id, connection_info.connection_id, connection_info.user_id)
if success then
-- 添加玩家到游戏
game_room.players[connection_info.user_id] = {
username = connection_info.username,
connection_id = connection_info.connection_id,
join_time = ngx.time(),
ready = false
}
game_room.current_players = game_room.current_players + 1
-- 通知其他玩家
local join_notification = {
type = "player_joined",
game_room_id = game_room_id,
player = {
user_id = connection_info.user_id,
username = connection_info.username
},
current_players = game_room.current_players,
max_players = game_room.max_players,
timestamp = ngx.time()
}
room_manager.broadcast_to_room(game_room_id, join_notification, connection_info.connection_id)
return {
type = "game_joined",
game_room_id = game_room_id,
game_type = game_room.game_type,
current_players = game_room.current_players,
max_players = game_room.max_players,
timestamp = ngx.time()
}
else
return {
type = "error",
error = err or "Failed to join game",
timestamp = ngx.time()
}
end
end)
-- 玩家准备
message_router.register_handler("player_ready", function(connection_info, message)
local game_room_id = message.game_room_id
local ready = message.ready
local game_room = game_rooms[game_room_id]
if not game_room then
return {
type = "error",
error = "Game room not found",
timestamp = ngx.time()
}
end
local player = game_room.players[connection_info.user_id]
if not player then
return {
type = "error",
error = "Player not in game",
timestamp = ngx.time()
}
end
player.ready = ready
-- 检查是否所有玩家都准备好了
local all_ready = true
local ready_count = 0
for _, p in pairs(game_room.players) do
if p.ready then
ready_count = ready_count + 1
else
all_ready = false
end
end
-- 通知其他玩家
local ready_notification = {
type = "player_ready_status",
game_room_id = game_room_id,
player_id = connection_info.user_id,
ready = ready,
ready_count = ready_count,
total_players = game_room.current_players,
timestamp = ngx.time()
}
room_manager.broadcast_to_room(game_room_id, ready_notification)
-- 如果所有玩家都准备好了,开始游戏
if all_ready and game_room.current_players >= 2 then
game.start_game(game_room_id)
end
return {
type = "ready_status_updated",
ready = ready,
timestamp = ngx.time()
}
end)
-- 游戏动作
message_router.register_handler("game_action", function(connection_info, message)
local game_room_id = message.game_room_id
local action = message.action
local game_room = game_rooms[game_room_id]
if not game_room then
return {
type = "error",
error = "Game room not found",
timestamp = ngx.time()
}
end
if game_room.game_state ~= "playing" then
return {
type = "error",
error = "Game is not in playing state",
timestamp = ngx.time()
}
end
-- 处理游戏动作
local result = game.process_game_action(game_room_id, connection_info.user_id, action)
if result.success then
-- 广播游戏状态更新
local game_update = {
type = "game_update",
game_room_id = game_room_id,
action = action,
player_id = connection_info.user_id,
game_state = result.game_state,
timestamp = ngx.time()
}
room_manager.broadcast_to_room(game_room_id, game_update)
return {
type = "action_processed",
timestamp = ngx.time()
}
else
return {
type = "error",
error = result.error,
timestamp = ngx.time()
}
end
end)
end
-- 开始游戏
function game.start_game(game_room_id)
local game_room = game_rooms[game_room_id]
if not game_room then
return false
end
game_room.game_state = "playing"
game_room.start_time = ngx.time()
-- 初始化游戏特定状态
if game_room.game_type == "tic_tac_toe" then
game_room.board = {0, 0, 0, 0, 0, 0, 0, 0, 0} -- 3x3棋盘
game_room.current_turn = 1 -- 玩家1先手
elseif game_room.game_type == "rock_paper_scissors" then
game_room.round = 1
game_room.player_choices = {}
end
-- 通知所有玩家游戏开始
local start_notification = {
type = "game_started",
game_room_id = game_room_id,
game_type = game_room.game_type,
players = game_room.players,
timestamp = ngx.time()
}
room_manager.broadcast_to_room(game_room_id, start_notification)
return true
end
-- 处理游戏动作
function game.process_game_action(game_room_id, player_id, action)
local game_room = game_rooms[game_room_id]
if game_room.game_type == "tic_tac_toe" then
return game.process_tic_tac_toe_action(game_room, player_id, action)
elseif game_room.game_type == "rock_paper_scissors" then
return game.process_rps_action(game_room, player_id, action)
else
return {success = false, error = "Unknown game type"}
end
end
-- 处理井字棋动作
function game.process_tic_tac_toe_action(game_room, player_id, action)
local position = action.position
if not position or position < 1 or position > 9 then
return {success = false, error = "Invalid position"}
end
if game_room.board[position] ~= 0 then
return {success = false, error = "Position already taken"}
end
-- 确定玩家编号
local player_number = 1
local player_index = 1
for uid, _ in pairs(game_room.players) do
if uid == player_id then
player_number = player_index
break
end
player_index = player_index + 1
end
if player_number ~= game_room.current_turn then
return {success = false, error = "Not your turn"}
end
-- 执行动作
game_room.board[position] = player_number
-- 检查胜利条件
local winner = game.check_tic_tac_toe_winner(game_room.board)
if winner then
game_room.game_state = "finished"
game_room.winner = winner
elseif game.is_board_full(game_room.board) then
game_room.game_state = "finished"
game_room.winner = "draw"
else
-- 切换回合
game_room.current_turn = game_room.current_turn == 1 and 2 or 1
end
return {
success = true,
game_state = {
board = game_room.board,
current_turn = game_room.current_turn,
game_status = game_room.game_state,
winner = game_room.winner
}
}
end
-- 检查井字棋胜利条件
function game.check_tic_tac_toe_winner(board)
local winning_combinations = {
{1, 2, 3}, {4, 5, 6}, {7, 8, 9}, -- 行
{1, 4, 7}, {2, 5, 8}, {3, 6, 9}, -- 列
{1, 5, 9}, {3, 5, 7} -- 对角线
}
for _, combo in ipairs(winning_combinations) do
if board[combo[1]] ~= 0 and
board[combo[1]] == board[combo[2]] and
board[combo[2]] == board[combo[3]] then
return board[combo[1]]
end
end
return nil
end
-- 检查棋盘是否已满
function game.is_board_full(board)
for i = 1, 9 do
if board[i] == 0 then
return false
end
end
return true
end
return game
6. 性能优化与监控
6.1 性能优化配置
# nginx.conf - WebSocket性能优化
worker_processes auto;
worker_rlimit_nofile 65535;
events {
worker_connections 65535;
use epoll;
multi_accept on;
}
http {
# WebSocket优化
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
# 连接保持
keepalive_timeout 65;
keepalive_requests 1000;
# 缓冲区优化
client_body_buffer_size 128k;
client_max_body_size 10m;
client_header_buffer_size 32k;
large_client_header_buffers 4 32k;
# Lua优化
lua_code_cache on;
lua_max_running_timers 256;
lua_max_pending_timers 1024;
# 共享内存优化
lua_shared_dict websocket_connections 500m;
lua_shared_dict websocket_rooms 200m;
lua_shared_dict websocket_users 200m;
lua_shared_dict websocket_stats 100m;
}
6.2 监控指标收集
-- websocket/monitor.lua - 监控模块
local monitor = {}
local cjson = require "cjson"
-- 收集WebSocket指标
function monitor.collect_metrics()
local connection_stats = connection_manager.get_connection_stats()
local room_stats = room_manager.get_room_stats()
local metrics = {
timestamp = ngx.time(),
connections = {
total = connection_stats.total_connections,
active = connection_stats.active_connections,
users_online = connection_stats.users_online
},
rooms = {
total = room_stats.total_rooms,
public = room_stats.public_rooms,
private = room_stats.private_rooms,
total_members = room_stats.total_members
},
performance = {
memory_usage = collectgarbage("count"),
uptime = ngx.time() - ngx.shared.websocket_stats:get("start_time") or 0
}
}
-- 存储指标到共享内存
local cache = ngx.shared.websocket_stats
cache:set("current_metrics", cjson.encode(metrics), 300)
return metrics
end
-- Prometheus格式输出
function monitor.prometheus_metrics()
local metrics = monitor.collect_metrics()
local output = {}
-- 连接指标
table.insert(output, string.format("websocket_connections_total %d", metrics.connections.total))
table.insert(output, string.format("websocket_connections_active %d", metrics.connections.active))
table.insert(output, string.format("websocket_users_online %d", metrics.connections.users_online))
-- 房间指标
table.insert(output, string.format("websocket_rooms_total %d", metrics.rooms.total))
table.insert(output, string.format("websocket_rooms_public %d", metrics.rooms.public))
table.insert(output, string.format("websocket_rooms_private %d", metrics.rooms.private))
table.insert(output, string.format("websocket_room_members_total %d", metrics.rooms.total_members))
-- 性能指标
table.insert(output, string.format("websocket_memory_usage_kb %.2f", metrics.performance.memory_usage))
table.insert(output, string.format("websocket_uptime_seconds %d", metrics.performance.uptime))
return table.concat(output, "\n")
end
return monitor
7. 部署与运维
7.1 生产环境配置
#!/bin/bash
# deploy_websocket.sh - WebSocket服务部署脚本
set -e
echo "Deploying WebSocket service..."
# 创建目录结构
mkdir -p /usr/local/openresty/lualib/websocket
mkdir -p /var/log/websocket
mkdir -p /var/www/websocket
# 复制Lua模块
cp -r websocket/* /usr/local/openresty/lualib/websocket/
# 设置权限
chown -R nginx:nginx /usr/local/openresty/lualib/websocket
chown -R nginx:nginx /var/log/websocket
chown -R nginx:nginx /var/www/websocket
# 测试配置
nginx -t
if [ $? -eq 0 ]; then
echo "Configuration test passed"
# 重载Nginx
nginx -s reload
echo "WebSocket service deployed successfully"
else
echo "Configuration test failed"
exit 1
fi
# 启动监控
echo "Starting monitoring..."
systemctl start websocket-monitor
systemctl enable websocket-monitor
echo "Deployment completed"
7.2 健康检查
-- websocket/health.lua - 健康检查模块
local health = {}
local cjson = require "cjson"
-- 健康检查
function health.check()
local status = {
status = "healthy",
timestamp = ngx.time(),
checks = {}
}
-- 检查共享内存
local memory_check = health.check_shared_memory()
table.insert(status.checks, memory_check)
-- 检查Redis连接
local redis_check = health.check_redis()
table.insert(status.checks, redis_check)
-- 检查连接数
local connection_check = health.check_connections()
table.insert(status.checks, connection_check)
-- 确定整体状态
for _, check in ipairs(status.checks) do
if check.status ~= "ok" then
status.status = "unhealthy"
break
end
end
return status
end
-- 检查共享内存
function health.check_shared_memory()
local check = {
name = "shared_memory",
status = "ok",
message = "Shared memory is accessible"
}
local cache = ngx.shared.websocket_connections
if not cache then
check.status = "error"
check.message = "Shared memory not available"
return check
end
-- 测试读写
local test_key = "health_check_" .. ngx.time()
local ok, err = cache:set(test_key, "test", 1)
if not ok then
check.status = "error"
check.message = "Failed to write to shared memory: " .. (err or "unknown error")
return check
end
cache:delete(test_key)
return check
end
-- 检查Redis连接
function health.check_redis()
local check = {
name = "redis",
status = "ok",
message = "Redis connection is healthy"
}
local redis = require "resty.redis"
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
check.status = "error"
check.message = "Failed to connect to Redis: " .. (err or "unknown error")
return check
end
-- 测试ping
local res, err = red:ping()
if not res then
check.status = "error"
check.message = "Redis ping failed: " .. (err or "unknown error")
return check
end
red:set_keepalive(10000, 100)
return check
end
-- 检查连接数
function health.check_connections()
local check = {
name = "connections",
status = "ok",
message = "Connection count is normal"
}
local stats = connection_manager.get_connection_stats()
local max_connections = 10000 -- 设置最大连接数阈值
if stats.active_connections > max_connections then
check.status = "warning"
check.message = string.format("High connection count: %d/%d",
stats.active_connections, max_connections)
end
check.details = {
active_connections = stats.active_connections,
users_online = stats.users_online,
max_connections = max_connections
}
return check
end
return health
8. 故障排查
8.1 常见问题诊断
-- websocket/diagnostics.lua - 诊断模块
local diagnostics = {}
local cjson = require "cjson"
-- 诊断WebSocket连接问题
function diagnostics.diagnose_connection_issues()
local issues = {}
-- 检查连接数异常
local stats = connection_manager.get_connection_stats()
if stats.active_connections == 0 then
table.insert(issues, {
type = "no_connections",
severity = "warning",
message = "No active WebSocket connections",
suggestions = {
"Check if WebSocket endpoint is accessible",
"Verify client connection code",
"Check firewall settings"
}
})
elseif stats.active_connections > 5000 then
table.insert(issues, {
type = "high_connections",
severity = "warning",
message = string.format("High connection count: %d", stats.active_connections),
suggestions = {
"Monitor for connection leaks",
"Implement connection limits",
"Check for DDoS attacks"
}
})
end
-- 检查内存使用
local memory_usage = collectgarbage("count")
if memory_usage > 500000 then -- 500MB
table.insert(issues, {
type = "high_memory",
severity = "error",
message = string.format("High memory usage: %.2f MB", memory_usage / 1024),
suggestions = {
"Check for memory leaks",
"Implement garbage collection",
"Review shared memory usage"
}
})
end
-- 检查过期连接
local expired_count = connection_manager.cleanup_expired_connections()
if expired_count > 100 then
table.insert(issues, {
type = "many_expired_connections",
severity = "warning",
message = string.format("Found %d expired connections", expired_count),
suggestions = {
"Check client heartbeat implementation",
"Review connection timeout settings",
"Monitor network stability"
}
})
end
return {
timestamp = ngx.time(),
issues_found = #issues,
issues = issues
}
end
-- 生成诊断报告
function diagnostics.generate_report()
local report = {
timestamp = ngx.time(),
server_info = {
openresty_version = ngx.config.nginx_version,
lua_version = _VERSION,
worker_id = ngx.worker.id(),
worker_count = ngx.worker.count()
},
connection_issues = diagnostics.diagnose_connection_issues(),
performance_metrics = monitor.collect_metrics(),
health_status = health.check()
}
return report
end
return diagnostics
8.2 管理接口
-- websocket/admin.lua - 管理接口模块
local admin = {}
local cjson = require "cjson"
-- 处理管理请求
function admin.handle_request()
local method = ngx.var.request_method
local uri = ngx.var.uri
-- 路由管理请求
if uri == "/api/ws/stats" then
admin.handle_stats_request()
elseif uri == "/api/ws/connections" then
admin.handle_connections_request()
elseif uri == "/api/ws/rooms" then
admin.handle_rooms_request()
elseif uri == "/api/ws/health" then
admin.handle_health_request()
elseif uri == "/api/ws/diagnostics" then
admin.handle_diagnostics_request()
elseif uri == "/api/ws/metrics" then
admin.handle_metrics_request()
else
ngx.status = 404
ngx.say(cjson.encode({error = "Endpoint not found"}))
end
end
-- 统计信息接口
function admin.handle_stats_request()
ngx.header.content_type = "application/json"
local connection_stats = connection_manager.get_connection_stats()
local room_stats = room_manager.get_room_stats()
local stats = {
connections = connection_stats,
rooms = room_stats,
timestamp = ngx.time()
}
ngx.say(cjson.encode(stats))
end
-- 连接管理接口
function admin.handle_connections_request()
ngx.header.content_type = "application/json"
if ngx.var.request_method == "GET" then
local connections = connection_manager.get_all_connections()
ngx.say(cjson.encode({connections = connections}))
elseif ngx.var.request_method == "DELETE" then
-- 断开指定连接
local args = ngx.req.get_uri_args()
local connection_id = args.connection_id
if connection_id then
local success = connection_manager.unregister_connection(connection_id)
ngx.say(cjson.encode({success = success}))
else
ngx.status = 400
ngx.say(cjson.encode({error = "connection_id is required"}))
end
end
end
-- 房间管理接口
function admin.handle_rooms_request()
ngx.header.content_type = "application/json"
if ngx.var.request_method == "GET" then
local rooms = room_manager.get_all_rooms()
ngx.say(cjson.encode({rooms = rooms}))
elseif ngx.var.request_method == "DELETE" then
-- 删除指定房间
local args = ngx.req.get_uri_args()
local room_id = args.room_id
if room_id then
local success = room_manager.delete_room(room_id, "admin")
ngx.say(cjson.encode({success = success}))
else
ngx.status = 400
ngx.say(cjson.encode({error = "room_id is required"}))
end
end
end
-- 健康检查接口
function admin.handle_health_request()
ngx.header.content_type = "application/json"
local health_status = health.check()
if health_status.status == "healthy" then
ngx.status = 200
else
ngx.status = 503
end
ngx.say(cjson.encode(health_status))
end
-- 诊断接口
function admin.handle_diagnostics_request()
ngx.header.content_type = "application/json"
local diagnostics_report = diagnostics.generate_report()
ngx.say(cjson.encode(diagnostics_report))
end
-- Prometheus指标接口
function admin.handle_metrics_request()
ngx.header.content_type = "text/plain"
local metrics = monitor.prometheus_metrics()
ngx.say(metrics)
end
return admin
总结
OpenResty的WebSocket支持为构建实时通信应用提供了强大的基础设施。通过合理的架构设计、连接管理、消息路由和性能优化,可以构建高性能、可扩展的实时应用系统。
关键要点
- 连接管理:使用共享内存管理连接状态,实现高效的连接跟踪
- 消息路由:设计灵活的消息路由机制,支持多种消息类型
- 房间管理:实现房间概念,支持群组通信和游戏场景
- 性能优化:通过连接池、缓存和异步处理提升性能
- 监控运维:建立完善的监控和诊断机制,确保服务稳定性
最佳实践
- 实现心跳机制确保连接活性
- 使用消息队列处理跨协程通信
- 建立完善的错误处理和恢复机制
- 实施安全认证和访问控制
- 定期清理过期连接和数据
- 监控关键性能指标
- 准备故障排查和恢复方案