1. WebSocket基础概念

1.1 WebSocket协议简介

WebSocket是一种在单个TCP连接上进行全双工通信的协议: - 持久连接:建立连接后保持长期通信 - 双向通信:客户端和服务器都可以主动发送数据 - 低延迟:避免了HTTP请求/响应的开销 - 实时性:支持实时数据推送 - 协议升级:从HTTP协议升级而来

1.2 WebSocket在OpenResty中的应用

┌─────────────────────────────────────────────────────────┐
│                    客户端应用                           │
│  ┌─────────────┬─────────────┬─────────────┬─────────┐  │
│  │   Web浏览器  │  移动应用    │   桌面应用   │  IoT设备 │  │
│  └─────────────┴─────────────┴─────────────┴─────────┘  │
└─────────────────┬───────────────────────────────────────┘
                  │ WebSocket连接
┌─────────────────▼───────────────────────────────────────┐
│                OpenResty WebSocket网关                 │
│  ┌─────────────┬─────────────┬─────────────┬─────────┐  │
│  │  连接管理    │  消息路由    │   认证鉴权   │  负载均衡│  │
│  └─────────────┴─────────────┴─────────────┴─────────┘  │
└─────────┬───────────┬───────────┬───────────┬───────────┘
          │           │           │           │
┌─────────▼─┐ ┌───────▼─┐ ┌───────▼─┐ ┌───────▼─┐
│  聊天服务  │ │ 推送服务 │ │ 游戏服务 │ │ 监控服务 │
└───────────┘ └─────────┘ └─────────┘ └─────────┘

1.3 应用场景

  • 即时通讯:聊天应用、在线客服
  • 实时推送:消息通知、状态更新
  • 在线游戏:多人游戏、实时对战
  • 数据监控:实时图表、系统监控
  • 协作工具:在线编辑、白板应用
  • 金融交易:股票行情、交易数据

2. OpenResty WebSocket配置

2.1 基础配置

# nginx.conf - WebSocket配置
http {
    # 共享内存配置
    lua_shared_dict websocket_connections 100m;
    lua_shared_dict websocket_rooms 50m;
    lua_shared_dict websocket_users 50m;
    
    # Lua包路径
    lua_package_path "/usr/local/openresty/lualib/websocket/?.lua;;";
    
    # 初始化
    init_by_lua_block {
        require "websocket.init"
    }
    
    init_worker_by_lua_block {
        require "websocket.worker"
    }
    
    # WebSocket服务器
    server {
        listen 8080;
        server_name ws.example.com;
        
        # WebSocket升级处理
        location /ws {
            # 设置WebSocket升级头
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            
            # WebSocket处理
            content_by_lua_block {
                local websocket_handler = require "websocket.handler"
                websocket_handler.handle_connection()
            }
        }
        
        # 聊天室WebSocket
        location /ws/chat {
            content_by_lua_block {
                local chat_handler = require "websocket.chat"
                chat_handler.handle_chat_connection()
            }
        }
        
        # 推送服务WebSocket
        location /ws/push {
            content_by_lua_block {
                local push_handler = require "websocket.push"
                push_handler.handle_push_connection()
            }
        }
        
        # 游戏服务WebSocket
        location /ws/game {
            content_by_lua_block {
                local game_handler = require "websocket.game"
                game_handler.handle_game_connection()
            }
        }
        
        # WebSocket管理接口
        location /api/ws/ {
            content_by_lua_block {
                local ws_admin = require "websocket.admin"
                ws_admin.handle_request()
            }
        }
        
        # 静态文件服务
        location / {
            root /var/www/websocket;
            index index.html;
        }
    }
}

2.2 WebSocket核心处理模块

-- websocket/handler.lua - WebSocket核心处理模块
local handler = {}
local server = require "resty.websocket.server"
local cjson = require "cjson"
local auth = require "websocket.auth"
local connection_manager = require "websocket.connection_manager"
local message_router = require "websocket.message_router"
local logger = require "websocket.logger"

-- 处理WebSocket连接
function handler.handle_connection()
    -- 创建WebSocket服务器实例
    local wb, err = server:new({
        timeout = 5000,  -- 5秒超时
        max_payload_len = 65535,  -- 最大消息长度
    })
    
    if not wb then
        ngx.log(ngx.ERR, "Failed to create WebSocket server: ", err)
        return ngx.exit(444)
    end
    
    -- 生成连接ID
    local connection_id = string.format("%s-%d-%d", 
        ngx.var.remote_addr, ngx.time(), math.random(10000, 99999))
    
    ngx.log(ngx.INFO, "New WebSocket connection: ", connection_id)
    
    -- 认证处理
    local auth_result = auth.authenticate_websocket()
    if not auth_result.success then
        local close_frame = {
            code = 1008,  -- Policy Violation
            reason = auth_result.error or "Authentication failed"
        }
        wb:send_close(close_frame.code, close_frame.reason)
        return
    end
    
    local user = auth_result.user
    
    -- 注册连接
    local connection_info = {
        connection_id = connection_id,
        user_id = user.user_id,
        username = user.username,
        remote_addr = ngx.var.remote_addr,
        user_agent = ngx.var.http_user_agent,
        connect_time = ngx.time(),
        last_ping = ngx.time(),
        status = "connected"
    }
    
    connection_manager.register_connection(connection_info)
    
    -- 发送连接确认消息
    local welcome_message = {
        type = "connection",
        status = "connected",
        connection_id = connection_id,
        user = {
            user_id = user.user_id,
            username = user.username
        },
        timestamp = ngx.time()
    }
    
    local ok, err = wb:send_text(cjson.encode(welcome_message))
    if not ok then
        ngx.log(ngx.ERR, "Failed to send welcome message: ", err)
        connection_manager.unregister_connection(connection_id)
        return
    end
    
    -- 消息处理循环
    while true do
        local data, typ, err = wb:recv_frame()
        
        if not data then
            if err then
                ngx.log(ngx.WARN, "WebSocket receive error: ", err)
            end
            break
        end
        
        -- 更新最后活动时间
        connection_manager.update_last_activity(connection_id)
        
        if typ == "close" then
            ngx.log(ngx.INFO, "WebSocket connection closed by client: ", connection_id)
            break
        elseif typ == "ping" then
            -- 响应ping
            local ok, err = wb:send_pong(data)
            if not ok then
                ngx.log(ngx.ERR, "Failed to send pong: ", err)
                break
            end
        elseif typ == "pong" then
            -- 处理pong响应
            ngx.log(ngx.DEBUG, "Received pong from client: ", connection_id)
        elseif typ == "text" then
            -- 处理文本消息
            local success, message = pcall(cjson.decode, data)
            if success then
                handler.process_message(wb, connection_info, message)
            else
                ngx.log(ngx.WARN, "Invalid JSON message from ", connection_id, ": ", data)
                local error_response = {
                    type = "error",
                    error = "Invalid JSON format",
                    timestamp = ngx.time()
                }
                wb:send_text(cjson.encode(error_response))
            end
        elseif typ == "binary" then
            -- 处理二进制消息
            handler.process_binary_message(wb, connection_info, data)
        end
    end
    
    -- 清理连接
    connection_manager.unregister_connection(connection_id)
    ngx.log(ngx.INFO, "WebSocket connection closed: ", connection_id)
end

-- 处理文本消息
function handler.process_message(wb, connection_info, message)
    if not message.type then
        local error_response = {
            type = "error",
            error = "Message type is required",
            timestamp = ngx.time()
        }
        wb:send_text(cjson.encode(error_response))
        return
    end
    
    -- 记录消息
    logger.log_message({
        connection_id = connection_info.connection_id,
        user_id = connection_info.user_id,
        message_type = message.type,
        message = message,
        timestamp = ngx.time()
    })
    
    -- 路由消息
    local response = message_router.route_message(connection_info, message)
    
    if response then
        local ok, err = wb:send_text(cjson.encode(response))
        if not ok then
            ngx.log(ngx.ERR, "Failed to send response: ", err)
        end
    end
end

-- 处理二进制消息
function handler.process_binary_message(wb, connection_info, data)
    -- 处理二进制数据(如文件上传、图片等)
    ngx.log(ngx.INFO, "Received binary message from ", connection_info.connection_id, 
           ", size: ", #data)
    
    -- 这里可以根据需要处理二进制数据
    -- 例如:文件上传、图片传输等
    
    local response = {
        type = "binary_ack",
        size = #data,
        timestamp = ngx.time()
    }
    
    wb:send_text(cjson.encode(response))
end

-- 发送消息到指定连接
function handler.send_message_to_connection(connection_id, message)
    local connection = connection_manager.get_connection(connection_id)
    if not connection then
        return false, "Connection not found"
    end
    
    -- 这里需要实现向特定连接发送消息的逻辑
    -- 由于WebSocket连接是在协程中处理的,需要使用共享内存或消息队列
    
    return true
end

-- 广播消息到所有连接
function handler.broadcast_message(message, filter)
    local connections = connection_manager.get_all_connections()
    local sent_count = 0
    
    for connection_id, connection in pairs(connections) do
        local should_send = true
        
        -- 应用过滤器
        if filter then
            should_send = filter(connection)
        end
        
        if should_send then
            local success = handler.send_message_to_connection(connection_id, message)
            if success then
                sent_count = sent_count + 1
            end
        end
    end
    
    return sent_count
end

return handler

3. 连接管理

3.1 连接管理器

-- websocket/connection_manager.lua - 连接管理模块
local connection_manager = {}
local cjson = require "cjson"

-- 注册连接
function connection_manager.register_connection(connection_info)
    local cache = ngx.shared.websocket_connections
    if not cache then
        return false, "Shared memory not available"
    end
    
    local connection_data = cjson.encode(connection_info)
    local ok, err = cache:set(connection_info.connection_id, connection_data, 3600)
    
    if not ok then
        ngx.log(ngx.ERR, "Failed to register connection: ", err)
        return false, err
    end
    
    -- 按用户索引连接
    local user_connections_key = "user:" .. connection_info.user_id
    local user_connections = cache:get(user_connections_key)
    local connections_list = user_connections and cjson.decode(user_connections) or {}
    
    table.insert(connections_list, connection_info.connection_id)
    cache:set(user_connections_key, cjson.encode(connections_list), 3600)
    
    -- 更新连接统计
    cache:incr("stats:total_connections", 1, 0)
    cache:incr("stats:user:" .. connection_info.user_id .. ":connections", 1, 0)
    
    ngx.log(ngx.INFO, "Connection registered: ", connection_info.connection_id)
    return true
end

-- 注销连接
function connection_manager.unregister_connection(connection_id)
    local cache = ngx.shared.websocket_connections
    if not cache then
        return false, "Shared memory not available"
    end
    
    -- 获取连接信息
    local connection_data = cache:get(connection_id)
    if not connection_data then
        return false, "Connection not found"
    end
    
    local connection_info = cjson.decode(connection_data)
    
    -- 删除连接
    cache:delete(connection_id)
    
    -- 从用户连接列表中移除
    local user_connections_key = "user:" .. connection_info.user_id
    local user_connections = cache:get(user_connections_key)
    
    if user_connections then
        local connections_list = cjson.decode(user_connections)
        local new_list = {}
        
        for _, conn_id in ipairs(connections_list) do
            if conn_id ~= connection_id then
                table.insert(new_list, conn_id)
            end
        end
        
        if #new_list > 0 then
            cache:set(user_connections_key, cjson.encode(new_list), 3600)
        else
            cache:delete(user_connections_key)
        end
    end
    
    -- 更新连接统计
    cache:incr("stats:total_connections", -1, 0)
    cache:incr("stats:user:" .. connection_info.user_id .. ":connections", -1, 0)
    
    ngx.log(ngx.INFO, "Connection unregistered: ", connection_id)
    return true
end

-- 获取连接信息
function connection_manager.get_connection(connection_id)
    local cache = ngx.shared.websocket_connections
    if not cache then
        return nil
    end
    
    local connection_data = cache:get(connection_id)
    if connection_data then
        return cjson.decode(connection_data)
    end
    
    return nil
end

-- 获取用户的所有连接
function connection_manager.get_user_connections(user_id)
    local cache = ngx.shared.websocket_connections
    if not cache then
        return {}
    end
    
    local user_connections_key = "user:" .. user_id
    local user_connections = cache:get(user_connections_key)
    
    if user_connections then
        local connections_list = cjson.decode(user_connections)
        local connections = {}
        
        for _, connection_id in ipairs(connections_list) do
            local connection = connection_manager.get_connection(connection_id)
            if connection then
                connections[connection_id] = connection
            end
        end
        
        return connections
    end
    
    return {}
end

-- 获取所有连接
function connection_manager.get_all_connections()
    local cache = ngx.shared.websocket_connections
    if not cache then
        return {}
    end
    
    local connections = {}
    local keys = cache:get_keys(0)  -- 获取所有键
    
    for _, key in ipairs(keys) do
        if not string.match(key, "^user:") and not string.match(key, "^stats:") then
            local connection_data = cache:get(key)
            if connection_data then
                connections[key] = cjson.decode(connection_data)
            end
        end
    end
    
    return connections
end

-- 更新连接最后活动时间
function connection_manager.update_last_activity(connection_id)
    local cache = ngx.shared.websocket_connections
    if not cache then
        return false
    end
    
    local connection_data = cache:get(connection_id)
    if connection_data then
        local connection_info = cjson.decode(connection_data)
        connection_info.last_ping = ngx.time()
        
        cache:set(connection_id, cjson.encode(connection_info), 3600)
        return true
    end
    
    return false
end

-- 获取连接统计信息
function connection_manager.get_connection_stats()
    local cache = ngx.shared.websocket_connections
    if not cache then
        return {error = "Shared memory not available"}
    end
    
    local total_connections = cache:get("stats:total_connections") or 0
    local connections = connection_manager.get_all_connections()
    
    local stats = {
        total_connections = total_connections,
        active_connections = 0,
        users_online = 0,
        connections_by_user = {},
        timestamp = ngx.time()
    }
    
    local users = {}
    
    for connection_id, connection in pairs(connections) do
        stats.active_connections = stats.active_connections + 1
        
        if not users[connection.user_id] then
            users[connection.user_id] = 0
            stats.users_online = stats.users_online + 1
        end
        
        users[connection.user_id] = users[connection.user_id] + 1
    end
    
    stats.connections_by_user = users
    
    return stats
end

-- 清理过期连接
function connection_manager.cleanup_expired_connections()
    local cache = ngx.shared.websocket_connections
    if not cache then
        return 0
    end
    
    local current_time = ngx.time()
    local timeout = 300  -- 5分钟超时
    local cleaned_count = 0
    
    local connections = connection_manager.get_all_connections()
    
    for connection_id, connection in pairs(connections) do
        if current_time - connection.last_ping > timeout then
            connection_manager.unregister_connection(connection_id)
            cleaned_count = cleaned_count + 1
            ngx.log(ngx.INFO, "Cleaned expired connection: ", connection_id)
        end
    end
    
    return cleaned_count
end

-- 启动清理定时器
function connection_manager.start_cleanup_timer()
    local function cleanup_timer(premature)
        if premature then
            return
        end
        
        local cleaned = connection_manager.cleanup_expired_connections()
        if cleaned > 0 then
            ngx.log(ngx.INFO, "Cleaned ", cleaned, " expired connections")
        end
        
        -- 重新设置定时器(每5分钟清理一次)
        local ok, err = ngx.timer.at(300, cleanup_timer)
        if not ok then
            ngx.log(ngx.ERR, "Failed to create cleanup timer: ", err)
        end
    end
    
    local ok, err = ngx.timer.at(300, cleanup_timer)
    if not ok then
        ngx.log(ngx.ERR, "Failed to create initial cleanup timer: ", err)
    end
end

return connection_manager

3.2 消息路由器

-- websocket/message_router.lua - 消息路由模块
local message_router = {}
local cjson = require "cjson"
local connection_manager = require "websocket.connection_manager"

-- 消息处理器注册表
local message_handlers = {}

-- 注册消息处理器
function message_router.register_handler(message_type, handler)
    message_handlers[message_type] = handler
end

-- 路由消息
function message_router.route_message(connection_info, message)
    local message_type = message.type
    local handler = message_handlers[message_type]
    
    if handler then
        return handler(connection_info, message)
    else
        ngx.log(ngx.WARN, "No handler found for message type: ", message_type)
        return {
            type = "error",
            error = "Unknown message type: " .. message_type,
            timestamp = ngx.time()
        }
    end
end

-- 默认消息处理器

-- Ping处理器
message_router.register_handler("ping", function(connection_info, message)
    return {
        type = "pong",
        timestamp = ngx.time(),
        server_time = ngx.time()
    }
end)

-- 心跳处理器
message_router.register_handler("heartbeat", function(connection_info, message)
    connection_manager.update_last_activity(connection_info.connection_id)
    
    return {
        type = "heartbeat_ack",
        timestamp = ngx.time()
    }
end)

-- 获取在线用户列表
message_router.register_handler("get_online_users", function(connection_info, message)
    local stats = connection_manager.get_connection_stats()
    local online_users = {}
    
    for user_id, connection_count in pairs(stats.connections_by_user) do
        table.insert(online_users, {
            user_id = user_id,
            connection_count = connection_count
        })
    end
    
    return {
        type = "online_users",
        users = online_users,
        total_users = stats.users_online,
        timestamp = ngx.time()
    }
end)

-- 私聊消息处理器
message_router.register_handler("private_message", function(connection_info, message)
    local target_user_id = message.target_user_id
    local content = message.content
    
    if not target_user_id or not content then
        return {
            type = "error",
            error = "target_user_id and content are required",
            timestamp = ngx.time()
        }
    end
    
    -- 获取目标用户的连接
    local target_connections = connection_manager.get_user_connections(target_user_id)
    
    if next(target_connections) == nil then
        return {
            type = "error",
            error = "Target user is not online",
            timestamp = ngx.time()
        }
    end
    
    -- 构造消息
    local private_message = {
        type = "private_message",
        from_user_id = connection_info.user_id,
        from_username = connection_info.username,
        content = content,
        timestamp = ngx.time()
    }
    
    -- 发送给目标用户的所有连接
    local sent_count = 0
    for connection_id, _ in pairs(target_connections) do
        -- 这里需要实现向特定连接发送消息的逻辑
        -- 由于WebSocket连接是在协程中处理的,需要使用消息队列
        message_router.queue_message(connection_id, private_message)
        sent_count = sent_count + 1
    end
    
    return {
        type = "message_sent",
        target_user_id = target_user_id,
        sent_to_connections = sent_count,
        timestamp = ngx.time()
    }
end)

-- 广播消息处理器
message_router.register_handler("broadcast", function(connection_info, message)
    local content = message.content
    
    if not content then
        return {
            type = "error",
            error = "content is required",
            timestamp = ngx.time()
        }
    end
    
    -- 检查权限(只有管理员可以广播)
    if connection_info.role ~= "admin" then
        return {
            type = "error",
            error = "Permission denied",
            timestamp = ngx.time()
        }
    end
    
    -- 构造广播消息
    local broadcast_message = {
        type = "broadcast",
        from_user_id = connection_info.user_id,
        from_username = connection_info.username,
        content = content,
        timestamp = ngx.time()
    }
    
    -- 广播给所有连接
    local connections = connection_manager.get_all_connections()
    local sent_count = 0
    
    for connection_id, _ in pairs(connections) do
        if connection_id ~= connection_info.connection_id then  -- 不发送给自己
            message_router.queue_message(connection_id, broadcast_message)
            sent_count = sent_count + 1
        end
    end
    
    return {
        type = "broadcast_sent",
        sent_to_connections = sent_count,
        timestamp = ngx.time()
    }
end)

-- 加入房间处理器
message_router.register_handler("join_room", function(connection_info, message)
    local room_id = message.room_id
    
    if not room_id then
        return {
            type = "error",
            error = "room_id is required",
            timestamp = ngx.time()
        }
    end
    
    local room_manager = require "websocket.room_manager"
    local success, err = room_manager.join_room(room_id, connection_info.connection_id, connection_info.user_id)
    
    if success then
        -- 通知房间内其他用户
        local join_notification = {
            type = "user_joined",
            room_id = room_id,
            user_id = connection_info.user_id,
            username = connection_info.username,
            timestamp = ngx.time()
        }
        
        room_manager.broadcast_to_room(room_id, join_notification, connection_info.connection_id)
        
        return {
            type = "room_joined",
            room_id = room_id,
            timestamp = ngx.time()
        }
    else
        return {
            type = "error",
            error = err or "Failed to join room",
            timestamp = ngx.time()
        }
    end
end)

-- 离开房间处理器
message_router.register_handler("leave_room", function(connection_info, message)
    local room_id = message.room_id
    
    if not room_id then
        return {
            type = "error",
            error = "room_id is required",
            timestamp = ngx.time()
        }
    end
    
    local room_manager = require "websocket.room_manager"
    local success, err = room_manager.leave_room(room_id, connection_info.connection_id)
    
    if success then
        -- 通知房间内其他用户
        local leave_notification = {
            type = "user_left",
            room_id = room_id,
            user_id = connection_info.user_id,
            username = connection_info.username,
            timestamp = ngx.time()
        }
        
        room_manager.broadcast_to_room(room_id, leave_notification)
        
        return {
            type = "room_left",
            room_id = room_id,
            timestamp = ngx.time()
        }
    else
        return {
            type = "error",
            error = err or "Failed to leave room",
            timestamp = ngx.time()
        }
    end
end)

-- 房间消息处理器
message_router.register_handler("room_message", function(connection_info, message)
    local room_id = message.room_id
    local content = message.content
    
    if not room_id or not content then
        return {
            type = "error",
            error = "room_id and content are required",
            timestamp = ngx.time()
        }
    end
    
    local room_manager = require "websocket.room_manager"
    
    -- 检查用户是否在房间中
    if not room_manager.is_user_in_room(room_id, connection_info.connection_id) then
        return {
            type = "error",
            error = "You are not in this room",
            timestamp = ngx.time()
        }
    end
    
    -- 构造房间消息
    local room_message = {
        type = "room_message",
        room_id = room_id,
        from_user_id = connection_info.user_id,
        from_username = connection_info.username,
        content = content,
        timestamp = ngx.time()
    }
    
    -- 广播给房间内所有用户
    local sent_count = room_manager.broadcast_to_room(room_id, room_message, connection_info.connection_id)
    
    return {
        type = "message_sent",
        room_id = room_id,
        sent_to_connections = sent_count,
        timestamp = ngx.time()
    }
end)

-- 消息队列(用于跨协程发送消息)
function message_router.queue_message(connection_id, message)
    local cache = ngx.shared.websocket_connections
    if not cache then
        return false
    end
    
    local queue_key = "queue:" .. connection_id
    local queue_data = cache:get(queue_key)
    local queue = queue_data and cjson.decode(queue_data) or {}
    
    table.insert(queue, {
        message = message,
        timestamp = ngx.time()
    })
    
    -- 限制队列长度
    if #queue > 100 then
        table.remove(queue, 1)  -- 移除最旧的消息
    end
    
    cache:set(queue_key, cjson.encode(queue), 300)  -- 5分钟过期
    return true
end

-- 获取队列中的消息
function message_router.get_queued_messages(connection_id)
    local cache = ngx.shared.websocket_connections
    if not cache then
        return {}
    end
    
    local queue_key = "queue:" .. connection_id
    local queue_data = cache:get(queue_key)
    
    if queue_data then
        local queue = cjson.decode(queue_data)
        cache:delete(queue_key)  -- 清空队列
        return queue
    end
    
    return {}
end

return message_router

4. 房间管理

4.1 房间管理器

-- websocket/room_manager.lua - 房间管理模块
local room_manager = {}
local cjson = require "cjson"
local connection_manager = require "websocket.connection_manager"

-- 创建房间
function room_manager.create_room(room_id, creator_user_id, room_config)
    local cache = ngx.shared.websocket_rooms
    if not cache then
        return false, "Shared memory not available"
    end
    
    -- 检查房间是否已存在
    local existing_room = cache:get(room_id)
    if existing_room then
        return false, "Room already exists"
    end
    
    local room_info = {
        room_id = room_id,
        creator_user_id = creator_user_id,
        created_time = ngx.time(),
        max_users = room_config and room_config.max_users or 100,
        is_private = room_config and room_config.is_private or false,
        password = room_config and room_config.password,
        description = room_config and room_config.description or "",
        members = {},
        member_count = 0
    }
    
    local ok, err = cache:set(room_id, cjson.encode(room_info), 3600)
    if not ok then
        return false, err
    end
    
    ngx.log(ngx.INFO, "Room created: ", room_id, " by user: ", creator_user_id)
    return true
end

-- 删除房间
function room_manager.delete_room(room_id, user_id)
    local cache = ngx.shared.websocket_rooms
    if not cache then
        return false, "Shared memory not available"
    end
    
    local room_data = cache:get(room_id)
    if not room_data then
        return false, "Room not found"
    end
    
    local room_info = cjson.decode(room_data)
    
    -- 检查权限(只有创建者可以删除房间)
    if room_info.creator_user_id ~= user_id then
        return false, "Permission denied"
    end
    
    -- 通知所有房间成员
    local delete_notification = {
        type = "room_deleted",
        room_id = room_id,
        timestamp = ngx.time()
    }
    
    room_manager.broadcast_to_room(room_id, delete_notification)
    
    -- 删除房间
    cache:delete(room_id)
    
    ngx.log(ngx.INFO, "Room deleted: ", room_id)
    return true
end

-- 加入房间
function room_manager.join_room(room_id, connection_id, user_id, password)
    local cache = ngx.shared.websocket_rooms
    if not cache then
        return false, "Shared memory not available"
    end
    
    local room_data = cache:get(room_id)
    if not room_data then
        return false, "Room not found"
    end
    
    local room_info = cjson.decode(room_data)
    
    -- 检查房间是否已满
    if room_info.member_count >= room_info.max_users then
        return false, "Room is full"
    end
    
    -- 检查密码(如果是私有房间)
    if room_info.is_private and room_info.password then
        if password ~= room_info.password then
            return false, "Invalid password"
        end
    end
    
    -- 检查用户是否已在房间中
    if room_info.members[connection_id] then
        return false, "Already in room"
    end
    
    -- 添加成员
    room_info.members[connection_id] = {
        user_id = user_id,
        join_time = ngx.time()
    }
    room_info.member_count = room_info.member_count + 1
    
    local ok, err = cache:set(room_id, cjson.encode(room_info), 3600)
    if not ok then
        return false, err
    end
    
    ngx.log(ngx.INFO, "User ", user_id, " joined room: ", room_id)
    return true
end

-- 离开房间
function room_manager.leave_room(room_id, connection_id)
    local cache = ngx.shared.websocket_rooms
    if not cache then
        return false, "Shared memory not available"
    end
    
    local room_data = cache:get(room_id)
    if not room_data then
        return false, "Room not found"
    end
    
    local room_info = cjson.decode(room_data)
    
    -- 检查用户是否在房间中
    if not room_info.members[connection_id] then
        return false, "Not in room"
    end
    
    local user_id = room_info.members[connection_id].user_id
    
    -- 移除成员
    room_info.members[connection_id] = nil
    room_info.member_count = room_info.member_count - 1
    
    -- 如果房间为空且不是永久房间,则删除房间
    if room_info.member_count == 0 and not room_info.is_permanent then
        cache:delete(room_id)
        ngx.log(ngx.INFO, "Empty room deleted: ", room_id)
    else
        cache:set(room_id, cjson.encode(room_info), 3600)
    end
    
    ngx.log(ngx.INFO, "User ", user_id, " left room: ", room_id)
    return true
end

-- 检查用户是否在房间中
function room_manager.is_user_in_room(room_id, connection_id)
    local cache = ngx.shared.websocket_rooms
    if not cache then
        return false
    end
    
    local room_data = cache:get(room_id)
    if not room_data then
        return false
    end
    
    local room_info = cjson.decode(room_data)
    return room_info.members[connection_id] ~= nil
end

-- 获取房间信息
function room_manager.get_room_info(room_id)
    local cache = ngx.shared.websocket_rooms
    if not cache then
        return nil
    end
    
    local room_data = cache:get(room_id)
    if room_data then
        return cjson.decode(room_data)
    end
    
    return nil
end

-- 获取房间成员列表
function room_manager.get_room_members(room_id)
    local room_info = room_manager.get_room_info(room_id)
    if not room_info then
        return {}
    end
    
    local members = {}
    for connection_id, member_info in pairs(room_info.members) do
        local connection = connection_manager.get_connection(connection_id)
        if connection then
            table.insert(members, {
                connection_id = connection_id,
                user_id = member_info.user_id,
                username = connection.username,
                join_time = member_info.join_time
            })
        end
    end
    
    return members
end

-- 向房间广播消息
function room_manager.broadcast_to_room(room_id, message, exclude_connection_id)
    local room_info = room_manager.get_room_info(room_id)
    if not room_info then
        return 0
    end
    
    local sent_count = 0
    local message_router = require "websocket.message_router"
    
    for connection_id, _ in pairs(room_info.members) do
        if connection_id ~= exclude_connection_id then
            message_router.queue_message(connection_id, message)
            sent_count = sent_count + 1
        end
    end
    
    return sent_count
end

-- 获取所有房间列表
function room_manager.get_all_rooms()
    local cache = ngx.shared.websocket_rooms
    if not cache then
        return {}
    end
    
    local rooms = {}
    local keys = cache:get_keys(0)
    
    for _, room_id in ipairs(keys) do
        local room_data = cache:get(room_id)
        if room_data then
            local room_info = cjson.decode(room_data)
            
            -- 只返回公开房间的基本信息
            if not room_info.is_private then
                table.insert(rooms, {
                    room_id = room_id,
                    description = room_info.description,
                    member_count = room_info.member_count,
                    max_users = room_info.max_users,
                    created_time = room_info.created_time
                })
            end
        end
    end
    
    return rooms
end

-- 获取房间统计信息
function room_manager.get_room_stats()
    local cache = ngx.shared.websocket_rooms
    if not cache then
        return {error = "Shared memory not available"}
    end
    
    local stats = {
        total_rooms = 0,
        public_rooms = 0,
        private_rooms = 0,
        total_members = 0,
        rooms = {},
        timestamp = ngx.time()
    }
    
    local keys = cache:get_keys(0)
    
    for _, room_id in ipairs(keys) do
        local room_data = cache:get(room_id)
        if room_data then
            local room_info = cjson.decode(room_data)
            
            stats.total_rooms = stats.total_rooms + 1
            stats.total_members = stats.total_members + room_info.member_count
            
            if room_info.is_private then
                stats.private_rooms = stats.private_rooms + 1
            else
                stats.public_rooms = stats.public_rooms + 1
            end
            
            stats.rooms[room_id] = {
                member_count = room_info.member_count,
                max_users = room_info.max_users,
                is_private = room_info.is_private,
                created_time = room_info.created_time
            }
        end
    end
    
    return stats
end

return room_manager

5. 实时应用示例

5.1 聊天应用

-- websocket/chat.lua - 聊天应用模块
local chat = {}
local server = require "resty.websocket.server"
local cjson = require "cjson"
local connection_manager = require "websocket.connection_manager"
local room_manager = require "websocket.room_manager"
local message_router = require "websocket.message_router"

-- 处理聊天连接
function chat.handle_chat_connection()
    local wb, err = server:new({
        timeout = 5000,
        max_payload_len = 65535,
    })
    
    if not wb then
        ngx.log(ngx.ERR, "Failed to create WebSocket server: ", err)
        return ngx.exit(444)
    end
    
    -- 认证和连接注册(复用之前的逻辑)
    local connection_id = string.format("chat-%s-%d-%d", 
        ngx.var.remote_addr, ngx.time(), math.random(10000, 99999))
    
    -- 注册聊天特定的消息处理器
    chat.register_chat_handlers()
    
    -- 消息处理循环
    while true do
        local data, typ, err = wb:recv_frame()
        
        if not data then
            break
        end
        
        if typ == "text" then
            local success, message = pcall(cjson.decode, data)
            if success then
                local response = message_router.route_message(connection_info, message)
                if response then
                    wb:send_text(cjson.encode(response))
                end
            end
        end
    end
end

-- 注册聊天特定的消息处理器
function chat.register_chat_handlers()
    -- 发送表情消息
    message_router.register_handler("emoji_message", function(connection_info, message)
        local room_id = message.room_id
        local emoji = message.emoji
        
        if not room_id or not emoji then
            return {
                type = "error",
                error = "room_id and emoji are required",
                timestamp = ngx.time()
            }
        end
        
        local emoji_message = {
            type = "emoji_message",
            room_id = room_id,
            from_user_id = connection_info.user_id,
            from_username = connection_info.username,
            emoji = emoji,
            timestamp = ngx.time()
        }
        
        local sent_count = room_manager.broadcast_to_room(room_id, emoji_message, connection_info.connection_id)
        
        return {
            type = "message_sent",
            sent_to_connections = sent_count,
            timestamp = ngx.time()
        }
    end)
    
    -- 文件分享消息
    message_router.register_handler("file_message", function(connection_info, message)
        local room_id = message.room_id
        local file_info = message.file_info
        
        if not room_id or not file_info then
            return {
                type = "error",
                error = "room_id and file_info are required",
                timestamp = ngx.time()
            }
        end
        
        local file_message = {
            type = "file_message",
            room_id = room_id,
            from_user_id = connection_info.user_id,
            from_username = connection_info.username,
            file_info = file_info,
            timestamp = ngx.time()
        }
        
        local sent_count = room_manager.broadcast_to_room(room_id, file_message, connection_info.connection_id)
        
        return {
            type = "message_sent",
            sent_to_connections = sent_count,
            timestamp = ngx.time()
        }
    end)
    
    -- 正在输入状态
    message_router.register_handler("typing_status", function(connection_info, message)
        local room_id = message.room_id
        local is_typing = message.is_typing
        
        if not room_id then
            return {
                type = "error",
                error = "room_id is required",
                timestamp = ngx.time()
            }
        end
        
        local typing_message = {
            type = "typing_status",
            room_id = room_id,
            user_id = connection_info.user_id,
            username = connection_info.username,
            is_typing = is_typing,
            timestamp = ngx.time()
        }
        
        room_manager.broadcast_to_room(room_id, typing_message, connection_info.connection_id)
        
        return {
            type = "typing_status_sent",
            timestamp = ngx.time()
        }
    end)
end

return chat

5.2 实时推送服务

-- websocket/push.lua - 推送服务模块
local push = {}
local server = require "resty.websocket.server"
local cjson = require "cjson"
local redis = require "resty.redis"
local connection_manager = require "websocket.connection_manager"

-- 处理推送连接
function push.handle_push_connection()
    local wb, err = server:new({
        timeout = 5000,
        max_payload_len = 65535,
    })
    
    if not wb then
        ngx.log(ngx.ERR, "Failed to create WebSocket server: ", err)
        return ngx.exit(444)
    end
    
    -- 连接认证和注册
    local connection_id = string.format("push-%s-%d-%d", 
        ngx.var.remote_addr, ngx.time(), math.random(10000, 99999))
    
    -- 订阅用户感兴趣的推送频道
    push.subscribe_user_channels(connection_info)
    
    -- 启动推送消息检查定时器
    push.start_push_timer(connection_id, wb)
    
    -- 消息处理循环
    while true do
        local data, typ, err = wb:recv_frame()
        
        if not data then
            break
        end
        
        if typ == "text" then
            local success, message = pcall(cjson.decode, data)
            if success then
                push.handle_push_message(connection_info, message, wb)
            end
        end
    end
end

-- 订阅用户频道
function push.subscribe_user_channels(connection_info)
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        ngx.log(ngx.ERR, "Failed to connect to Redis: ", err)
        return
    end
    
    -- 订阅用户个人频道
    red:subscribe("user:" .. connection_info.user_id)
    
    -- 订阅用户角色频道
    if connection_info.role then
        red:subscribe("role:" .. connection_info.role)
    end
    
    -- 订阅全局频道
    red:subscribe("global")
    
    red:set_keepalive(10000, 100)
end

-- 启动推送定时器
function push.start_push_timer(connection_id, wb)
    local function push_timer(premature)
        if premature then
            return
        end
        
        -- 检查是否有待推送的消息
        local messages = push.get_pending_messages(connection_id)
        
        for _, message in ipairs(messages) do
            local ok, err = wb:send_text(cjson.encode(message))
            if not ok then
                ngx.log(ngx.ERR, "Failed to send push message: ", err)
                return
            end
        end
        
        -- 重新设置定时器
        local ok, err = ngx.timer.at(1, push_timer)  -- 每秒检查一次
        if not ok then
            ngx.log(ngx.ERR, "Failed to create push timer: ", err)
        end
    end
    
    local ok, err = ngx.timer.at(1, push_timer)
    if not ok then
        ngx.log(ngx.ERR, "Failed to create initial push timer: ", err)
    end
end

-- 获取待推送消息
function push.get_pending_messages(connection_id)
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        return {}
    end
    
    local messages = {}
    local queue_key = "push_queue:" .. connection_id
    
    -- 获取队列中的所有消息
    while true do
        local message_data = red:lpop(queue_key)
        if not message_data or message_data == ngx.null then
            break
        end
        
        local success, message = pcall(cjson.decode, message_data)
        if success then
            table.insert(messages, message)
        end
    end
    
    red:set_keepalive(10000, 100)
    return messages
end

-- 处理推送消息
function push.handle_push_message(connection_info, message, wb)
    if message.type == "subscribe" then
        -- 订阅新频道
        local channel = message.channel
        if channel then
            push.subscribe_channel(connection_info.connection_id, channel)
            
            local response = {
                type = "subscribed",
                channel = channel,
                timestamp = ngx.time()
            }
            
            wb:send_text(cjson.encode(response))
        end
    elseif message.type == "unsubscribe" then
        -- 取消订阅频道
        local channel = message.channel
        if channel then
            push.unsubscribe_channel(connection_info.connection_id, channel)
            
            local response = {
                type = "unsubscribed",
                channel = channel,
                timestamp = ngx.time()
            }
            
            wb:send_text(cjson.encode(response))
        end
    end
end

-- 发送推送消息到用户
function push.send_to_user(user_id, message)
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        return false, err
    end
    
    -- 获取用户的所有连接
    local user_connections = connection_manager.get_user_connections(user_id)
    
    for connection_id, _ in pairs(user_connections) do
        local queue_key = "push_queue:" .. connection_id
        red:rpush(queue_key, cjson.encode(message))
        red:expire(queue_key, 300)  -- 5分钟过期
    end
    
    red:set_keepalive(10000, 100)
    return true
end

-- 发送推送消息到频道
function push.send_to_channel(channel, message)
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        return false, err
    end
    
    -- 发布到Redis频道
    red:publish(channel, cjson.encode(message))
    
    red:set_keepalive(10000, 100)
    return true
end

-- 全局广播
function push.broadcast_global(message)
    return push.send_to_channel("global", message)
end

return push

5.3 在线游戏服务

-- websocket/game.lua - 游戏服务模块
local game = {}
local server = require "resty.websocket.server"
local cjson = require "cjson"
local connection_manager = require "websocket.connection_manager"
local room_manager = require "websocket.room_manager"

-- 游戏状态管理
local game_rooms = {}

-- 处理游戏连接
function game.handle_game_connection()
    local wb, err = server:new({
        timeout = 5000,
        max_payload_len = 65535,
    })
    
    if not wb then
        ngx.log(ngx.ERR, "Failed to create WebSocket server: ", err)
        return ngx.exit(444)
    end
    
    -- 注册游戏特定的消息处理器
    game.register_game_handlers()
    
    -- 消息处理循环
    while true do
        local data, typ, err = wb:recv_frame()
        
        if not data then
            break
        end
        
        if typ == "text" then
            local success, message = pcall(cjson.decode, data)
            if success then
                game.handle_game_message(connection_info, message, wb)
            end
        end
    end
end

-- 注册游戏消息处理器
function game.register_game_handlers()
    -- 创建游戏房间
    message_router.register_handler("create_game", function(connection_info, message)
        local game_type = message.game_type
        local max_players = message.max_players or 2
        
        local game_room_id = "game_" .. ngx.time() .. "_" .. math.random(1000, 9999)
        
        local game_config = {
            max_users = max_players,
            is_private = false,
            description = "Game Room: " .. game_type
        }
        
        local success, err = room_manager.create_room(game_room_id, connection_info.user_id, game_config)
        
        if success then
            -- 初始化游戏状态
            game_rooms[game_room_id] = {
                game_type = game_type,
                max_players = max_players,
                current_players = 0,
                game_state = "waiting",
                players = {},
                created_time = ngx.time()
            }
            
            return {
                type = "game_created",
                game_room_id = game_room_id,
                game_type = game_type,
                max_players = max_players,
                timestamp = ngx.time()
            }
        else
            return {
                type = "error",
                error = err or "Failed to create game",
                timestamp = ngx.time()
            }
        end
    end)
    
    -- 加入游戏
    message_router.register_handler("join_game", function(connection_info, message)
        local game_room_id = message.game_room_id
        
        if not game_room_id then
            return {
                type = "error",
                error = "game_room_id is required",
                timestamp = ngx.time()
            }
        end
        
        local game_room = game_rooms[game_room_id]
        if not game_room then
            return {
                type = "error",
                error = "Game room not found",
                timestamp = ngx.time()
            }
        end
        
        if game_room.current_players >= game_room.max_players then
            return {
                type = "error",
                error = "Game room is full",
                timestamp = ngx.time()
            }
        end
        
        -- 加入房间
        local success, err = room_manager.join_room(game_room_id, connection_info.connection_id, connection_info.user_id)
        
        if success then
            -- 添加玩家到游戏
            game_room.players[connection_info.user_id] = {
                username = connection_info.username,
                connection_id = connection_info.connection_id,
                join_time = ngx.time(),
                ready = false
            }
            game_room.current_players = game_room.current_players + 1
            
            -- 通知其他玩家
            local join_notification = {
                type = "player_joined",
                game_room_id = game_room_id,
                player = {
                    user_id = connection_info.user_id,
                    username = connection_info.username
                },
                current_players = game_room.current_players,
                max_players = game_room.max_players,
                timestamp = ngx.time()
            }
            
            room_manager.broadcast_to_room(game_room_id, join_notification, connection_info.connection_id)
            
            return {
                type = "game_joined",
                game_room_id = game_room_id,
                game_type = game_room.game_type,
                current_players = game_room.current_players,
                max_players = game_room.max_players,
                timestamp = ngx.time()
            }
        else
            return {
                type = "error",
                error = err or "Failed to join game",
                timestamp = ngx.time()
            }
        end
    end)
    
    -- 玩家准备
    message_router.register_handler("player_ready", function(connection_info, message)
        local game_room_id = message.game_room_id
        local ready = message.ready
        
        local game_room = game_rooms[game_room_id]
        if not game_room then
            return {
                type = "error",
                error = "Game room not found",
                timestamp = ngx.time()
            }
        end
        
        local player = game_room.players[connection_info.user_id]
        if not player then
            return {
                type = "error",
                error = "Player not in game",
                timestamp = ngx.time()
            }
        end
        
        player.ready = ready
        
        -- 检查是否所有玩家都准备好了
        local all_ready = true
        local ready_count = 0
        
        for _, p in pairs(game_room.players) do
            if p.ready then
                ready_count = ready_count + 1
            else
                all_ready = false
            end
        end
        
        -- 通知其他玩家
        local ready_notification = {
            type = "player_ready_status",
            game_room_id = game_room_id,
            player_id = connection_info.user_id,
            ready = ready,
            ready_count = ready_count,
            total_players = game_room.current_players,
            timestamp = ngx.time()
        }
        
        room_manager.broadcast_to_room(game_room_id, ready_notification)
        
        -- 如果所有玩家都准备好了,开始游戏
        if all_ready and game_room.current_players >= 2 then
            game.start_game(game_room_id)
        end
        
        return {
            type = "ready_status_updated",
            ready = ready,
            timestamp = ngx.time()
        }
    end)
    
    -- 游戏动作
    message_router.register_handler("game_action", function(connection_info, message)
        local game_room_id = message.game_room_id
        local action = message.action
        
        local game_room = game_rooms[game_room_id]
        if not game_room then
            return {
                type = "error",
                error = "Game room not found",
                timestamp = ngx.time()
            }
        end
        
        if game_room.game_state ~= "playing" then
            return {
                type = "error",
                error = "Game is not in playing state",
                timestamp = ngx.time()
            }
        end
        
        -- 处理游戏动作
        local result = game.process_game_action(game_room_id, connection_info.user_id, action)
        
        if result.success then
            -- 广播游戏状态更新
            local game_update = {
                type = "game_update",
                game_room_id = game_room_id,
                action = action,
                player_id = connection_info.user_id,
                game_state = result.game_state,
                timestamp = ngx.time()
            }
            
            room_manager.broadcast_to_room(game_room_id, game_update)
            
            return {
                type = "action_processed",
                timestamp = ngx.time()
            }
        else
            return {
                type = "error",
                error = result.error,
                timestamp = ngx.time()
            }
        end
    end)
end

-- 开始游戏
function game.start_game(game_room_id)
    local game_room = game_rooms[game_room_id]
    if not game_room then
        return false
    end
    
    game_room.game_state = "playing"
    game_room.start_time = ngx.time()
    
    -- 初始化游戏特定状态
    if game_room.game_type == "tic_tac_toe" then
        game_room.board = {0, 0, 0, 0, 0, 0, 0, 0, 0}  -- 3x3棋盘
        game_room.current_turn = 1  -- 玩家1先手
    elseif game_room.game_type == "rock_paper_scissors" then
        game_room.round = 1
        game_room.player_choices = {}
    end
    
    -- 通知所有玩家游戏开始
    local start_notification = {
        type = "game_started",
        game_room_id = game_room_id,
        game_type = game_room.game_type,
        players = game_room.players,
        timestamp = ngx.time()
    }
    
    room_manager.broadcast_to_room(game_room_id, start_notification)
    
    return true
end

-- 处理游戏动作
function game.process_game_action(game_room_id, player_id, action)
    local game_room = game_rooms[game_room_id]
    
    if game_room.game_type == "tic_tac_toe" then
        return game.process_tic_tac_toe_action(game_room, player_id, action)
    elseif game_room.game_type == "rock_paper_scissors" then
        return game.process_rps_action(game_room, player_id, action)
    else
        return {success = false, error = "Unknown game type"}
    end
end

-- 处理井字棋动作
function game.process_tic_tac_toe_action(game_room, player_id, action)
    local position = action.position
    
    if not position or position < 1 or position > 9 then
        return {success = false, error = "Invalid position"}
    end
    
    if game_room.board[position] ~= 0 then
        return {success = false, error = "Position already taken"}
    end
    
    -- 确定玩家编号
    local player_number = 1
    local player_index = 1
    for uid, _ in pairs(game_room.players) do
        if uid == player_id then
            player_number = player_index
            break
        end
        player_index = player_index + 1
    end
    
    if player_number ~= game_room.current_turn then
        return {success = false, error = "Not your turn"}
    end
    
    -- 执行动作
    game_room.board[position] = player_number
    
    -- 检查胜利条件
    local winner = game.check_tic_tac_toe_winner(game_room.board)
    
    if winner then
        game_room.game_state = "finished"
        game_room.winner = winner
    elseif game.is_board_full(game_room.board) then
        game_room.game_state = "finished"
        game_room.winner = "draw"
    else
        -- 切换回合
        game_room.current_turn = game_room.current_turn == 1 and 2 or 1
    end
    
    return {
        success = true,
        game_state = {
            board = game_room.board,
            current_turn = game_room.current_turn,
            game_status = game_room.game_state,
            winner = game_room.winner
        }
    }
end

-- 检查井字棋胜利条件
function game.check_tic_tac_toe_winner(board)
    local winning_combinations = {
        {1, 2, 3}, {4, 5, 6}, {7, 8, 9},  -- 行
        {1, 4, 7}, {2, 5, 8}, {3, 6, 9},  -- 列
        {1, 5, 9}, {3, 5, 7}              -- 对角线
    }
    
    for _, combo in ipairs(winning_combinations) do
        if board[combo[1]] ~= 0 and 
           board[combo[1]] == board[combo[2]] and 
           board[combo[2]] == board[combo[3]] then
            return board[combo[1]]
        end
    end
    
    return nil
end

-- 检查棋盘是否已满
function game.is_board_full(board)
    for i = 1, 9 do
        if board[i] == 0 then
            return false
        end
    end
    return true
end

return game

6. 性能优化与监控

6.1 性能优化配置

# nginx.conf - WebSocket性能优化
worker_processes auto;
worker_rlimit_nofile 65535;

events {
    worker_connections 65535;
    use epoll;
    multi_accept on;
}

http {
    # WebSocket优化
    proxy_read_timeout 3600s;
    proxy_send_timeout 3600s;
    
    # 连接保持
    keepalive_timeout 65;
    keepalive_requests 1000;
    
    # 缓冲区优化
    client_body_buffer_size 128k;
    client_max_body_size 10m;
    client_header_buffer_size 32k;
    large_client_header_buffers 4 32k;
    
    # Lua优化
    lua_code_cache on;
    lua_max_running_timers 256;
    lua_max_pending_timers 1024;
    
    # 共享内存优化
    lua_shared_dict websocket_connections 500m;
    lua_shared_dict websocket_rooms 200m;
    lua_shared_dict websocket_users 200m;
    lua_shared_dict websocket_stats 100m;
}

6.2 监控指标收集

-- websocket/monitor.lua - 监控模块
local monitor = {}
local cjson = require "cjson"

-- 收集WebSocket指标
function monitor.collect_metrics()
    local connection_stats = connection_manager.get_connection_stats()
    local room_stats = room_manager.get_room_stats()
    
    local metrics = {
        timestamp = ngx.time(),
        connections = {
            total = connection_stats.total_connections,
            active = connection_stats.active_connections,
            users_online = connection_stats.users_online
        },
        rooms = {
            total = room_stats.total_rooms,
            public = room_stats.public_rooms,
            private = room_stats.private_rooms,
            total_members = room_stats.total_members
        },
        performance = {
            memory_usage = collectgarbage("count"),
            uptime = ngx.time() - ngx.shared.websocket_stats:get("start_time") or 0
        }
    }
    
    -- 存储指标到共享内存
    local cache = ngx.shared.websocket_stats
    cache:set("current_metrics", cjson.encode(metrics), 300)
    
    return metrics
end

-- Prometheus格式输出
function monitor.prometheus_metrics()
    local metrics = monitor.collect_metrics()
    
    local output = {}
    
    -- 连接指标
    table.insert(output, string.format("websocket_connections_total %d", metrics.connections.total))
    table.insert(output, string.format("websocket_connections_active %d", metrics.connections.active))
    table.insert(output, string.format("websocket_users_online %d", metrics.connections.users_online))
    
    -- 房间指标
    table.insert(output, string.format("websocket_rooms_total %d", metrics.rooms.total))
    table.insert(output, string.format("websocket_rooms_public %d", metrics.rooms.public))
    table.insert(output, string.format("websocket_rooms_private %d", metrics.rooms.private))
    table.insert(output, string.format("websocket_room_members_total %d", metrics.rooms.total_members))
    
    -- 性能指标
    table.insert(output, string.format("websocket_memory_usage_kb %.2f", metrics.performance.memory_usage))
    table.insert(output, string.format("websocket_uptime_seconds %d", metrics.performance.uptime))
    
    return table.concat(output, "\n")
end

return monitor

7. 部署与运维

7.1 生产环境配置

#!/bin/bash
# deploy_websocket.sh - WebSocket服务部署脚本

set -e

echo "Deploying WebSocket service..."

# 创建目录结构
mkdir -p /usr/local/openresty/lualib/websocket
mkdir -p /var/log/websocket
mkdir -p /var/www/websocket

# 复制Lua模块
cp -r websocket/* /usr/local/openresty/lualib/websocket/

# 设置权限
chown -R nginx:nginx /usr/local/openresty/lualib/websocket
chown -R nginx:nginx /var/log/websocket
chown -R nginx:nginx /var/www/websocket

# 测试配置
nginx -t

if [ $? -eq 0 ]; then
    echo "Configuration test passed"
    
    # 重载Nginx
    nginx -s reload
    
    echo "WebSocket service deployed successfully"
else
    echo "Configuration test failed"
    exit 1
fi

# 启动监控
echo "Starting monitoring..."
systemctl start websocket-monitor
systemctl enable websocket-monitor

echo "Deployment completed"

7.2 健康检查

-- websocket/health.lua - 健康检查模块
local health = {}
local cjson = require "cjson"

-- 健康检查
function health.check()
    local status = {
        status = "healthy",
        timestamp = ngx.time(),
        checks = {}
    }
    
    -- 检查共享内存
    local memory_check = health.check_shared_memory()
    table.insert(status.checks, memory_check)
    
    -- 检查Redis连接
    local redis_check = health.check_redis()
    table.insert(status.checks, redis_check)
    
    -- 检查连接数
    local connection_check = health.check_connections()
    table.insert(status.checks, connection_check)
    
    -- 确定整体状态
    for _, check in ipairs(status.checks) do
        if check.status ~= "ok" then
            status.status = "unhealthy"
            break
        end
    end
    
    return status
end

-- 检查共享内存
function health.check_shared_memory()
    local check = {
        name = "shared_memory",
        status = "ok",
        message = "Shared memory is accessible"
    }
    
    local cache = ngx.shared.websocket_connections
    if not cache then
        check.status = "error"
        check.message = "Shared memory not available"
        return check
    end
    
    -- 测试读写
    local test_key = "health_check_" .. ngx.time()
    local ok, err = cache:set(test_key, "test", 1)
    
    if not ok then
        check.status = "error"
        check.message = "Failed to write to shared memory: " .. (err or "unknown error")
        return check
    end
    
    cache:delete(test_key)
    return check
end

-- 检查Redis连接
function health.check_redis()
    local check = {
        name = "redis",
        status = "ok",
        message = "Redis connection is healthy"
    }
    
    local redis = require "resty.redis"
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        check.status = "error"
        check.message = "Failed to connect to Redis: " .. (err or "unknown error")
        return check
    end
    
    -- 测试ping
    local res, err = red:ping()
    if not res then
        check.status = "error"
        check.message = "Redis ping failed: " .. (err or "unknown error")
        return check
    end
    
    red:set_keepalive(10000, 100)
    return check
end

-- 检查连接数
function health.check_connections()
    local check = {
        name = "connections",
        status = "ok",
        message = "Connection count is normal"
    }
    
    local stats = connection_manager.get_connection_stats()
    local max_connections = 10000  -- 设置最大连接数阈值
    
    if stats.active_connections > max_connections then
        check.status = "warning"
        check.message = string.format("High connection count: %d/%d", 
                                    stats.active_connections, max_connections)
    end
    
    check.details = {
        active_connections = stats.active_connections,
        users_online = stats.users_online,
        max_connections = max_connections
    }
    
    return check
end

return health

8. 故障排查

8.1 常见问题诊断

-- websocket/diagnostics.lua - 诊断模块
local diagnostics = {}
local cjson = require "cjson"

-- 诊断WebSocket连接问题
function diagnostics.diagnose_connection_issues()
    local issues = {}
    
    -- 检查连接数异常
    local stats = connection_manager.get_connection_stats()
    if stats.active_connections == 0 then
        table.insert(issues, {
            type = "no_connections",
            severity = "warning",
            message = "No active WebSocket connections",
            suggestions = {
                "Check if WebSocket endpoint is accessible",
                "Verify client connection code",
                "Check firewall settings"
            }
        })
    elseif stats.active_connections > 5000 then
        table.insert(issues, {
            type = "high_connections",
            severity = "warning",
            message = string.format("High connection count: %d", stats.active_connections),
            suggestions = {
                "Monitor for connection leaks",
                "Implement connection limits",
                "Check for DDoS attacks"
            }
        })
    end
    
    -- 检查内存使用
    local memory_usage = collectgarbage("count")
    if memory_usage > 500000 then  -- 500MB
        table.insert(issues, {
            type = "high_memory",
            severity = "error",
            message = string.format("High memory usage: %.2f MB", memory_usage / 1024),
            suggestions = {
                "Check for memory leaks",
                "Implement garbage collection",
                "Review shared memory usage"
            }
        })
    end
    
    -- 检查过期连接
    local expired_count = connection_manager.cleanup_expired_connections()
    if expired_count > 100 then
        table.insert(issues, {
            type = "many_expired_connections",
            severity = "warning",
            message = string.format("Found %d expired connections", expired_count),
            suggestions = {
                "Check client heartbeat implementation",
                "Review connection timeout settings",
                "Monitor network stability"
            }
        })
    end
    
    return {
        timestamp = ngx.time(),
        issues_found = #issues,
        issues = issues
    }
end

-- 生成诊断报告
function diagnostics.generate_report()
    local report = {
        timestamp = ngx.time(),
        server_info = {
            openresty_version = ngx.config.nginx_version,
            lua_version = _VERSION,
            worker_id = ngx.worker.id(),
            worker_count = ngx.worker.count()
        },
        connection_issues = diagnostics.diagnose_connection_issues(),
        performance_metrics = monitor.collect_metrics(),
        health_status = health.check()
    }
    
    return report
end

return diagnostics

8.2 管理接口

-- websocket/admin.lua - 管理接口模块
local admin = {}
local cjson = require "cjson"

-- 处理管理请求
function admin.handle_request()
    local method = ngx.var.request_method
    local uri = ngx.var.uri
    
    -- 路由管理请求
    if uri == "/api/ws/stats" then
        admin.handle_stats_request()
    elseif uri == "/api/ws/connections" then
        admin.handle_connections_request()
    elseif uri == "/api/ws/rooms" then
        admin.handle_rooms_request()
    elseif uri == "/api/ws/health" then
        admin.handle_health_request()
    elseif uri == "/api/ws/diagnostics" then
        admin.handle_diagnostics_request()
    elseif uri == "/api/ws/metrics" then
        admin.handle_metrics_request()
    else
        ngx.status = 404
        ngx.say(cjson.encode({error = "Endpoint not found"}))
    end
end

-- 统计信息接口
function admin.handle_stats_request()
    ngx.header.content_type = "application/json"
    
    local connection_stats = connection_manager.get_connection_stats()
    local room_stats = room_manager.get_room_stats()
    
    local stats = {
        connections = connection_stats,
        rooms = room_stats,
        timestamp = ngx.time()
    }
    
    ngx.say(cjson.encode(stats))
end

-- 连接管理接口
function admin.handle_connections_request()
    ngx.header.content_type = "application/json"
    
    if ngx.var.request_method == "GET" then
        local connections = connection_manager.get_all_connections()
        ngx.say(cjson.encode({connections = connections}))
    elseif ngx.var.request_method == "DELETE" then
        -- 断开指定连接
        local args = ngx.req.get_uri_args()
        local connection_id = args.connection_id
        
        if connection_id then
            local success = connection_manager.unregister_connection(connection_id)
            ngx.say(cjson.encode({success = success}))
        else
            ngx.status = 400
            ngx.say(cjson.encode({error = "connection_id is required"}))
        end
    end
end

-- 房间管理接口
function admin.handle_rooms_request()
    ngx.header.content_type = "application/json"
    
    if ngx.var.request_method == "GET" then
        local rooms = room_manager.get_all_rooms()
        ngx.say(cjson.encode({rooms = rooms}))
    elseif ngx.var.request_method == "DELETE" then
        -- 删除指定房间
        local args = ngx.req.get_uri_args()
        local room_id = args.room_id
        
        if room_id then
            local success = room_manager.delete_room(room_id, "admin")
            ngx.say(cjson.encode({success = success}))
        else
            ngx.status = 400
            ngx.say(cjson.encode({error = "room_id is required"}))
        end
    end
end

-- 健康检查接口
function admin.handle_health_request()
    ngx.header.content_type = "application/json"
    
    local health_status = health.check()
    
    if health_status.status == "healthy" then
        ngx.status = 200
    else
        ngx.status = 503
    end
    
    ngx.say(cjson.encode(health_status))
end

-- 诊断接口
function admin.handle_diagnostics_request()
    ngx.header.content_type = "application/json"
    
    local diagnostics_report = diagnostics.generate_report()
    ngx.say(cjson.encode(diagnostics_report))
end

-- Prometheus指标接口
function admin.handle_metrics_request()
    ngx.header.content_type = "text/plain"
    
    local metrics = monitor.prometheus_metrics()
    ngx.say(metrics)
end

return admin

总结

OpenResty的WebSocket支持为构建实时通信应用提供了强大的基础设施。通过合理的架构设计、连接管理、消息路由和性能优化,可以构建高性能、可扩展的实时应用系统。

关键要点

  1. 连接管理:使用共享内存管理连接状态,实现高效的连接跟踪
  2. 消息路由:设计灵活的消息路由机制,支持多种消息类型
  3. 房间管理:实现房间概念,支持群组通信和游戏场景
  4. 性能优化:通过连接池、缓存和异步处理提升性能
  5. 监控运维:建立完善的监控和诊断机制,确保服务稳定性

最佳实践

  • 实现心跳机制确保连接活性
  • 使用消息队列处理跨协程通信
  • 建立完善的错误处理和恢复机制
  • 实施安全认证和访问控制
  • 定期清理过期连接和数据
  • 监控关键性能指标
  • 准备故障排查和恢复方案