1. 微服务架构概述

1.1 微服务架构的特点

微服务架构是一种将单一应用程序开发为一组小型服务的方法: - 服务独立:每个服务可以独立开发、部署和扩展 - 技术多样性:不同服务可以使用不同的技术栈 - 数据隔离:每个服务拥有自己的数据存储 - 故障隔离:单个服务的故障不会影响整个系统 - 团队自治:小团队可以独立负责特定服务

1.2 OpenResty在微服务中的角色

┌─────────────────────────────────────────────────────────┐
│                    客户端应用                           │
└─────────────────┬───────────────────────────────────────┘
                  │
┌─────────────────▼───────────────────────────────────────┐
│                OpenResty API网关                       │
│  ┌─────────────┬─────────────┬─────────────┬─────────┐  │
│  │   认证鉴权   │   路由转发   │   限流熔断   │   监控   │  │
│  └─────────────┴─────────────┴─────────────┴─────────┘  │
└─────────┬───────────┬───────────┬───────────┬───────────┘
          │           │           │           │
┌─────────▼─┐ ┌───────▼─┐ ┌───────▼─┐ ┌───────▼─┐
│  用户服务  │ │ 订单服务 │ │ 支付服务 │ │ 通知服务 │
└───────────┘ └─────────┘ └─────────┘ └─────────┘

1.3 API网关的核心功能

  • 请求路由:根据规则将请求转发到相应的微服务
  • 协议转换:支持HTTP、WebSocket、gRPC等多种协议
  • 认证授权:统一的身份验证和权限控制
  • 限流熔断:保护后端服务免受过载
  • 监控日志:统一的请求监控和日志记录
  • 缓存:减少对后端服务的请求
  • 数据转换:请求和响应的格式转换

2. API网关基础架构

2.1 网关核心配置

# nginx.conf - API网关配置
http {
    # 日志格式
    log_format gateway_log '$remote_addr - $remote_user [$time_local] '
                          '"$request" $status $body_bytes_sent '
                          '"$http_referer" "$http_user_agent" '
                          'rt=$request_time uct="$upstream_connect_time" '
                          'uht="$upstream_header_time" urt="$upstream_response_time" '
                          'service="$upstream_addr" request_id="$request_id"';
    
    # 共享内存配置
    lua_shared_dict gateway_cache 100m;
    lua_shared_dict rate_limit 10m;
    lua_shared_dict circuit_breaker 10m;
    lua_shared_dict service_discovery 50m;
    
    # Lua包路径
    lua_package_path "/usr/local/openresty/lualib/gateway/?.lua;;";
    
    # 初始化
    init_by_lua_block {
        require "gateway.init"
    }
    
    init_worker_by_lua_block {
        require "gateway.worker"
    }
    
    # 微服务后端定义
    upstream user_service {
        server 192.168.1.10:8001;
        server 192.168.1.11:8001;
        keepalive 32;
    }
    
    upstream order_service {
        server 192.168.1.20:8002;
        server 192.168.1.21:8002;
        keepalive 32;
    }
    
    upstream payment_service {
        server 192.168.1.30:8003;
        server 192.168.1.31:8003;
        keepalive 32;
    }
    
    # API网关服务器
    server {
        listen 80;
        server_name api.example.com;
        
        access_log /var/log/nginx/gateway.log gateway_log;
        
        # 全局请求处理
        access_by_lua_block {
            local gateway = require "gateway.core"
            gateway.access_handler()
        }
        
        # 用户服务路由
        location ~* ^/api/v1/users(/.*)?$ {
            set $service_name "user_service";
            set $service_path $1;
            
            rewrite_by_lua_block {
                local gateway = require "gateway.core"
                gateway.rewrite_handler()
            }
            
            proxy_pass http://user_service$service_path;
            
            header_filter_by_lua_block {
                local gateway = require "gateway.core"
                gateway.header_filter_handler()
            }
            
            log_by_lua_block {
                local gateway = require "gateway.core"
                gateway.log_handler()
            }
        }
        
        # 订单服务路由
        location ~* ^/api/v1/orders(/.*)?$ {
            set $service_name "order_service";
            set $service_path $1;
            
            rewrite_by_lua_block {
                local gateway = require "gateway.core"
                gateway.rewrite_handler()
            }
            
            proxy_pass http://order_service$service_path;
            
            header_filter_by_lua_block {
                local gateway = require "gateway.core"
                gateway.header_filter_handler()
            }
            
            log_by_lua_block {
                local gateway = require "gateway.core"
                gateway.log_handler()
            }
        }
        
        # 支付服务路由
        location ~* ^/api/v1/payments(/.*)?$ {
            set $service_name "payment_service";
            set $service_path $1;
            
            rewrite_by_lua_block {
                local gateway = require "gateway.core"
                gateway.rewrite_handler()
            }
            
            proxy_pass http://payment_service$service_path;
            
            header_filter_by_lua_block {
                local gateway = require "gateway.core"
                gateway.header_filter_handler()
            }
            
            log_by_lua_block {
                local gateway = require "gateway.core"
                gateway.log_handler()
            }
        }
        
        # 健康检查
        location /health {
            content_by_lua_block {
                local gateway = require "gateway.health"
                gateway.health_check()
            }
        }
        
        # 管理接口
        location /admin/ {
            allow 192.168.1.0/24;
            deny all;
            
            content_by_lua_block {
                local admin = require "gateway.admin"
                admin.handle_request()
            }
        }
    }
}

2.2 网关核心模块

-- gateway/core.lua - 网关核心模块
local core = {}
local cjson = require "cjson"
local auth = require "gateway.auth"
local router = require "gateway.router"
local rate_limiter = require "gateway.rate_limiter"
local circuit_breaker = require "gateway.circuit_breaker"
local transformer = require "gateway.transformer"
local monitor = require "gateway.monitor"

-- 请求ID生成
local function generate_request_id()
    return string.format("%d-%d-%d", 
        ngx.time(), ngx.worker.pid(), math.random(10000, 99999))
end

-- 访问阶段处理
function core.access_handler()
    -- 生成请求ID
    local request_id = generate_request_id()
    ngx.var.request_id = request_id
    ngx.ctx.request_id = request_id
    ngx.ctx.start_time = ngx.now()
    
    -- 记录请求开始
    monitor.record_request_start()
    
    -- 认证鉴权
    local auth_result = auth.authenticate()
    if not auth_result.success then
        ngx.status = auth_result.status or 401
        ngx.header.content_type = "application/json"
        ngx.say(cjson.encode({
            error = auth_result.error or "Authentication failed",
            request_id = request_id
        }))
        return ngx.exit(ngx.status)
    end
    
    ngx.ctx.user = auth_result.user
    
    -- 路由解析
    local route = router.resolve_route()
    if not route then
        ngx.status = 404
        ngx.header.content_type = "application/json"
        ngx.say(cjson.encode({
            error = "Route not found",
            request_id = request_id
        }))
        return ngx.exit(404)
    end
    
    ngx.ctx.route = route
    
    -- 权限检查
    if not auth.authorize(ngx.ctx.user, route) then
        ngx.status = 403
        ngx.header.content_type = "application/json"
        ngx.say(cjson.encode({
            error = "Access denied",
            request_id = request_id
        }))
        return ngx.exit(403)
    end
    
    -- 限流检查
    local rate_limit_result = rate_limiter.check_rate_limit(ngx.ctx.user, route)
    if not rate_limit_result.allowed then
        ngx.status = 429
        ngx.header.content_type = "application/json"
        ngx.header["X-RateLimit-Limit"] = rate_limit_result.limit
        ngx.header["X-RateLimit-Remaining"] = rate_limit_result.remaining
        ngx.header["X-RateLimit-Reset"] = rate_limit_result.reset_time
        ngx.say(cjson.encode({
            error = "Rate limit exceeded",
            request_id = request_id
        }))
        return ngx.exit(429)
    end
    
    -- 熔断检查
    local circuit_result = circuit_breaker.check_circuit(route.service)
    if circuit_result.state == "open" then
        ngx.status = 503
        ngx.header.content_type = "application/json"
        ngx.say(cjson.encode({
            error = "Service temporarily unavailable",
            request_id = request_id
        }))
        return ngx.exit(503)
    end
end

-- 重写阶段处理
function core.rewrite_handler()
    local route = ngx.ctx.route
    if not route then
        return
    end
    
    -- 请求转换
    transformer.transform_request(route)
    
    -- 设置上游服务信息
    ngx.var.upstream_service = route.service
    
    -- 添加追踪头
    ngx.req.set_header("X-Request-ID", ngx.ctx.request_id)
    ngx.req.set_header("X-Gateway-Version", "1.0")
    
    if ngx.ctx.user then
        ngx.req.set_header("X-User-ID", ngx.ctx.user.user_id)
        ngx.req.set_header("X-User-Role", ngx.ctx.user.role)
    end
end

-- 响应头过滤处理
function core.header_filter_handler()
    local route = ngx.ctx.route
    if not route then
        return
    end
    
    -- 添加响应头
    ngx.header["X-Request-ID"] = ngx.ctx.request_id
    ngx.header["X-Gateway"] = "OpenResty-Gateway"
    
    -- 响应转换
    transformer.transform_response_headers(route)
    
    -- 记录响应状态
    ngx.ctx.response_status = ngx.status
end

-- 日志阶段处理
function core.log_handler()
    local end_time = ngx.now()
    local start_time = ngx.ctx.start_time or end_time
    local response_time = (end_time - start_time) * 1000  -- 转换为毫秒
    
    -- 更新熔断器状态
    if ngx.ctx.route then
        local success = ngx.status < 500
        circuit_breaker.record_result(ngx.ctx.route.service, success, response_time)
    end
    
    -- 记录监控指标
    monitor.record_request_end({
        request_id = ngx.ctx.request_id,
        service = ngx.ctx.route and ngx.ctx.route.service,
        method = ngx.var.request_method,
        uri = ngx.var.uri,
        status = ngx.status,
        response_time = response_time,
        user_id = ngx.ctx.user and ngx.ctx.user.user_id
    })
end

return core

3. 服务发现与路由

3.1 动态路由配置

-- gateway/router.lua - 路由模块
local router = {}
local cjson = require "cjson"
local service_discovery = require "gateway.service_discovery"

-- 路由规则配置
local routes = {
    {
        id = "user_service",
        pattern = "^/api/v1/users(/.*)?$",
        service = "user_service",
        methods = {"GET", "POST", "PUT", "DELETE"},
        auth_required = true,
        rate_limit = {
            requests_per_minute = 100,
            burst = 20
        },
        timeout = {
            connect = 5,
            send = 10,
            read = 30
        },
        retry = {
            times = 3,
            delay = 1
        }
    },
    {
        id = "order_service",
        pattern = "^/api/v1/orders(/.*)?$",
        service = "order_service",
        methods = {"GET", "POST", "PUT", "DELETE"},
        auth_required = true,
        rate_limit = {
            requests_per_minute = 200,
            burst = 50
        },
        timeout = {
            connect = 5,
            send = 10,
            read = 30
        }
    },
    {
        id = "payment_service",
        pattern = "^/api/v1/payments(/.*)?$",
        service = "payment_service",
        methods = {"POST", "GET"},
        auth_required = true,
        rate_limit = {
            requests_per_minute = 50,
            burst = 10
        },
        timeout = {
            connect = 3,
            send = 5,
            read = 15
        },
        security = {
            require_https = true,
            allowed_roles = {"user", "admin"}
        }
    }
}

-- 解析路由
function router.resolve_route()
    local uri = ngx.var.uri
    local method = ngx.var.request_method
    
    for _, route in ipairs(routes) do
        if string.match(uri, route.pattern) then
            -- 检查HTTP方法
            local method_allowed = false
            for _, allowed_method in ipairs(route.methods) do
                if method == allowed_method then
                    method_allowed = true
                    break
                end
            end
            
            if method_allowed then
                -- 获取服务实例
                local service_instances = service_discovery.get_service_instances(route.service)
                if service_instances and #service_instances > 0 then
                    route.instances = service_instances
                    return route
                end
            end
        end
    end
    
    return nil
end

-- 获取路由配置
function router.get_route_config(route_id)
    for _, route in ipairs(routes) do
        if route.id == route_id then
            return route
        end
    end
    return nil
end

-- 动态添加路由
function router.add_route(route_config)
    -- 验证路由配置
    if not route_config.id or not route_config.pattern or not route_config.service then
        return false, "Invalid route configuration"
    end
    
    -- 检查路由是否已存在
    for i, route in ipairs(routes) do
        if route.id == route_config.id then
            routes[i] = route_config  -- 更新现有路由
            return true, "Route updated"
        end
    end
    
    -- 添加新路由
    table.insert(routes, route_config)
    return true, "Route added"
end

-- 删除路由
function router.remove_route(route_id)
    for i, route in ipairs(routes) do
        if route.id == route_id then
            table.remove(routes, i)
            return true, "Route removed"
        end
    end
    return false, "Route not found"
end

-- 获取所有路由
function router.get_all_routes()
    return routes
end

-- 路由健康检查
function router.check_route_health(route)
    local service_instances = service_discovery.get_service_instances(route.service)
    local healthy_instances = 0
    
    if service_instances then
        for _, instance in ipairs(service_instances) do
            if instance.status == "healthy" then
                healthy_instances = healthy_instances + 1
            end
        end
    end
    
    return {
        route_id = route.id,
        service = route.service,
        total_instances = service_instances and #service_instances or 0,
        healthy_instances = healthy_instances,
        health_ratio = service_instances and #service_instances > 0 and 
                      (healthy_instances / #service_instances) or 0
    }
end

return router

3.2 服务发现实现

-- gateway/service_discovery.lua - 服务发现模块
local service_discovery = {}
local cjson = require "cjson"
local http = require "resty.http"

-- 服务注册表
local services = {
    user_service = {
        {
            id = "user-service-1",
            host = "192.168.1.10",
            port = 8001,
            weight = 100,
            status = "healthy",
            last_check = ngx.time(),
            metadata = {
                version = "1.0.0",
                region = "us-west-1"
            }
        },
        {
            id = "user-service-2",
            host = "192.168.1.11",
            port = 8001,
            weight = 100,
            status = "healthy",
            last_check = ngx.time(),
            metadata = {
                version = "1.0.0",
                region = "us-west-1"
            }
        }
    },
    order_service = {
        {
            id = "order-service-1",
            host = "192.168.1.20",
            port = 8002,
            weight = 100,
            status = "healthy",
            last_check = ngx.time()
        },
        {
            id = "order-service-2",
            host = "192.168.1.21",
            port = 8002,
            weight = 100,
            status = "healthy",
            last_check = ngx.time()
        }
    },
    payment_service = {
        {
            id = "payment-service-1",
            host = "192.168.1.30",
            port = 8003,
            weight = 100,
            status = "healthy",
            last_check = ngx.time()
        }
    }
}

-- 获取服务实例
function service_discovery.get_service_instances(service_name)
    return services[service_name] or {}
end

-- 获取健康的服务实例
function service_discovery.get_healthy_instances(service_name)
    local instances = services[service_name] or {}
    local healthy_instances = {}
    
    for _, instance in ipairs(instances) do
        if instance.status == "healthy" then
            table.insert(healthy_instances, instance)
        end
    end
    
    return healthy_instances
end

-- 注册服务实例
function service_discovery.register_service(service_name, instance)
    if not services[service_name] then
        services[service_name] = {}
    end
    
    -- 检查实例是否已存在
    for i, existing_instance in ipairs(services[service_name]) do
        if existing_instance.id == instance.id then
            services[service_name][i] = instance  -- 更新现有实例
            return true
        end
    end
    
    -- 添加新实例
    table.insert(services[service_name], instance)
    return true
end

-- 注销服务实例
function service_discovery.deregister_service(service_name, instance_id)
    if not services[service_name] then
        return false
    end
    
    for i, instance in ipairs(services[service_name]) do
        if instance.id == instance_id then
            table.remove(services[service_name], i)
            return true
        end
    end
    
    return false
end

-- 更新服务实例状态
function service_discovery.update_instance_status(service_name, instance_id, status)
    if not services[service_name] then
        return false
    end
    
    for _, instance in ipairs(services[service_name]) do
        if instance.id == instance_id then
            instance.status = status
            instance.last_check = ngx.time()
            return true
        end
    end
    
    return false
end

-- 健康检查
function service_discovery.health_check_instance(instance)
    local httpc = http.new()
    httpc:set_timeout(5000)  -- 5秒超时
    
    local url = string.format("http://%s:%d/health", instance.host, instance.port)
    
    local res, err = httpc:request_uri(url, {
        method = "GET",
        headers = {
            ["User-Agent"] = "Gateway-HealthChecker/1.0"
        }
    })
    
    if not res then
        ngx.log(ngx.WARN, "Health check failed for ", instance.id, ": ", err)
        return false
    end
    
    return res.status == 200
end

-- 批量健康检查
function service_discovery.health_check_all_services()
    local results = {}
    
    for service_name, instances in pairs(services) do
        results[service_name] = {}
        
        for _, instance in ipairs(instances) do
            local is_healthy = service_discovery.health_check_instance(instance)
            local new_status = is_healthy and "healthy" or "unhealthy"
            
            if instance.status ~= new_status then
                ngx.log(ngx.INFO, "Service instance ", instance.id, " status changed: ", 
                       instance.status, " -> ", new_status)
                instance.status = new_status
            end
            
            instance.last_check = ngx.time()
            
            table.insert(results[service_name], {
                instance_id = instance.id,
                status = new_status,
                response_time = 0  -- 可以记录响应时间
            })
        end
    end
    
    return results
end

-- 从Consul获取服务
function service_discovery.sync_from_consul(consul_url)
    local httpc = http.new()
    httpc:set_timeout(10000)
    
    local res, err = httpc:request_uri(consul_url .. "/v1/catalog/services", {
        method = "GET"
    })
    
    if not res or res.status ~= 200 then
        ngx.log(ngx.ERR, "Failed to fetch services from Consul: ", err or res.status)
        return false
    end
    
    local consul_services = cjson.decode(res.body)
    
    for service_name, tags in pairs(consul_services) do
        -- 获取服务实例详情
        local service_res, service_err = httpc:request_uri(
            consul_url .. "/v1/health/service/" .. service_name .. "?passing=true", {
            method = "GET"
        })
        
        if service_res and service_res.status == 200 then
            local service_instances = cjson.decode(service_res.body)
            local instances = {}
            
            for _, consul_instance in ipairs(service_instances) do
                local instance = {
                    id = consul_instance.Service.ID,
                    host = consul_instance.Service.Address,
                    port = consul_instance.Service.Port,
                    weight = 100,
                    status = "healthy",
                    last_check = ngx.time(),
                    metadata = consul_instance.Service.Meta or {}
                }
                table.insert(instances, instance)
            end
            
            services[service_name] = instances
        end
    end
    
    return true
end

-- 启动健康检查定时器
function service_discovery.start_health_check_timer()
    local function health_check_timer(premature)
        if premature then
            return
        end
        
        service_discovery.health_check_all_services()
        
        -- 重新设置定时器(每30秒检查一次)
        local ok, err = ngx.timer.at(30, health_check_timer)
        if not ok then
            ngx.log(ngx.ERR, "Failed to create health check timer: ", err)
        end
    end
    
    local ok, err = ngx.timer.at(30, health_check_timer)
    if not ok then
        ngx.log(ngx.ERR, "Failed to create initial health check timer: ", err)
    end
end

-- 获取服务统计信息
function service_discovery.get_service_stats()
    local stats = {}
    
    for service_name, instances in pairs(services) do
        local total_instances = #instances
        local healthy_instances = 0
        
        for _, instance in ipairs(instances) do
            if instance.status == "healthy" then
                healthy_instances = healthy_instances + 1
            end
        end
        
        stats[service_name] = {
            total_instances = total_instances,
            healthy_instances = healthy_instances,
            health_ratio = total_instances > 0 and (healthy_instances / total_instances) or 0
        }
    end
    
    return stats
end

return service_discovery

4. 认证与授权

4.1 JWT认证实现

-- gateway/auth.lua - 认证授权模块
local auth = {}
local jwt = require "resty.jwt"
local cjson = require "cjson"
local redis = require "resty.redis"

-- JWT配置
local jwt_config = {
    secret = "your-secret-key",
    algorithm = "HS256",
    expiration = 3600,  -- 1小时
    issuer = "api-gateway"
}

-- Redis连接
local function get_redis()
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        ngx.log(ngx.ERR, "Failed to connect to Redis: ", err)
        return nil
    end
    
    return red
end

-- 验证JWT令牌
local function verify_jwt_token(token)
    local jwt_obj = jwt:verify(jwt_config.secret, token)
    
    if not jwt_obj.valid then
        return nil, "Invalid token"
    end
    
    local payload = jwt_obj.payload
    
    -- 检查过期时间
    if payload.exp and payload.exp < ngx.time() then
        return nil, "Token expired"
    end
    
    -- 检查发行者
    if payload.iss ~= jwt_config.issuer then
        return nil, "Invalid issuer"
    end
    
    return payload, nil
end

-- 检查令牌黑名单
local function is_token_blacklisted(jti)
    local red = get_redis()
    if not red then
        return false  -- Redis不可用时允许通过
    end
    
    local result = red:get("blacklist:" .. jti)
    red:set_keepalive(10000, 100)
    
    return result and result ~= ngx.null
end

-- 获取用户信息
local function get_user_info(user_id)
    local red = get_redis()
    if not red then
        return nil
    end
    
    local user_data = red:get("user:" .. user_id)
    red:set_keepalive(10000, 100)
    
    if user_data and user_data ~= ngx.null then
        return cjson.decode(user_data)
    end
    
    return nil
end

-- 认证处理
function auth.authenticate()
    local auth_header = ngx.var.http_authorization
    
    if not auth_header then
        return {
            success = false,
            status = 401,
            error = "Missing authorization header"
        }
    end
    
    -- 解析Bearer令牌
    local token = string.match(auth_header, "Bearer%s+(.+)")
    if not token then
        return {
            success = false,
            status = 401,
            error = "Invalid authorization header format"
        }
    end
    
    -- 验证JWT令牌
    local payload, err = verify_jwt_token(token)
    if not payload then
        return {
            success = false,
            status = 401,
            error = err or "Token verification failed"
        }
    end
    
    -- 检查令牌黑名单
    if payload.jti and is_token_blacklisted(payload.jti) then
        return {
            success = false,
            status = 401,
            error = "Token has been revoked"
        }
    end
    
    -- 获取用户详细信息
    local user = get_user_info(payload.sub)
    if not user then
        return {
            success = false,
            status = 401,
            error = "User not found"
        }
    end
    
    -- 检查用户状态
    if user.status ~= "active" then
        return {
            success = false,
            status = 401,
            error = "User account is not active"
        }
    end
    
    return {
        success = true,
        user = {
            user_id = user.user_id,
            username = user.username,
            email = user.email,
            role = user.role,
            permissions = user.permissions or {},
            token_payload = payload
        }
    }
end

-- 授权检查
function auth.authorize(user, route)
    if not route.auth_required then
        return true  -- 不需要认证的路由
    end
    
    if not user then
        return false  -- 需要认证但用户未认证
    end
    
    -- 检查角色权限
    if route.security and route.security.allowed_roles then
        local role_allowed = false
        for _, allowed_role in ipairs(route.security.allowed_roles) do
            if user.role == allowed_role then
                role_allowed = true
                break
            end
        end
        
        if not role_allowed then
            return false
        end
    end
    
    -- 检查具体权限
    if route.security and route.security.required_permissions then
        for _, required_permission in ipairs(route.security.required_permissions) do
            local has_permission = false
            for _, user_permission in ipairs(user.permissions) do
                if user_permission == required_permission then
                    has_permission = true
                    break
                end
            end
            
            if not has_permission then
                return false
            end
        end
    end
    
    -- 检查资源级权限
    if route.security and route.security.resource_check then
        return auth.check_resource_permission(user, route)
    end
    
    return true
end

-- 资源级权限检查
function auth.check_resource_permission(user, route)
    local method = ngx.var.request_method
    local uri = ngx.var.uri
    
    -- 提取资源ID(如果有)
    local resource_id = string.match(uri, "/([^/]+)$")
    
    -- 根据不同的服务和操作进行权限检查
    if route.service == "user_service" then
        if method == "GET" or method == "PUT" or method == "DELETE" then
            -- 用户只能访问自己的资源,除非是管理员
            if user.role == "admin" then
                return true
            end
            
            if resource_id and resource_id == user.user_id then
                return true
            end
            
            return false
        end
    elseif route.service == "order_service" then
        if method == "GET" then
            -- 用户只能查看自己的订单
            return auth.check_order_ownership(user.user_id, resource_id)
        end
    end
    
    return true
end

-- 检查订单所有权
function auth.check_order_ownership(user_id, order_id)
    if not order_id then
        return true  -- 列表查询,由后端服务过滤
    end
    
    local red = get_redis()
    if not red then
        return true  -- Redis不可用时允许通过,由后端服务处理
    end
    
    local order_owner = red:get("order:" .. order_id .. ":owner")
    red:set_keepalive(10000, 100)
    
    return order_owner == user_id
end

-- 生成JWT令牌
function auth.generate_token(user)
    local payload = {
        sub = user.user_id,
        iss = jwt_config.issuer,
        iat = ngx.time(),
        exp = ngx.time() + jwt_config.expiration,
        jti = string.format("%s-%d-%d", user.user_id, ngx.time(), math.random(10000, 99999)),
        role = user.role,
        username = user.username
    }
    
    local token = jwt:sign(jwt_config.secret, {
        header = {
            typ = "JWT",
            alg = jwt_config.algorithm
        },
        payload = payload
    })
    
    return token
end

-- 撤销令牌
function auth.revoke_token(token)
    local payload, err = verify_jwt_token(token)
    if not payload then
        return false, err
    end
    
    if not payload.jti then
        return false, "Token does not have JTI"
    end
    
    local red = get_redis()
    if not red then
        return false, "Redis connection failed"
    end
    
    local ttl = payload.exp - ngx.time()
    if ttl > 0 then
        red:setex("blacklist:" .. payload.jti, ttl, "1")
    end
    
    red:set_keepalive(10000, 100)
    return true
end

-- API密钥认证
function auth.authenticate_api_key()
    local api_key = ngx.var.http_x_api_key
    
    if not api_key then
        return {
            success = false,
            status = 401,
            error = "Missing API key"
        }
    end
    
    local red = get_redis()
    if not red then
        return {
            success = false,
            status = 500,
            error = "Internal server error"
        }
    end
    
    local api_key_data = red:get("api_key:" .. api_key)
    red:set_keepalive(10000, 100)
    
    if not api_key_data or api_key_data == ngx.null then
        return {
            success = false,
            status = 401,
            error = "Invalid API key"
        }
    end
    
    local key_info = cjson.decode(api_key_data)
    
    -- 检查API密钥状态
    if key_info.status ~= "active" then
        return {
            success = false,
            status = 401,
            error = "API key is not active"
        }
    end
    
    -- 检查过期时间
    if key_info.expires_at and key_info.expires_at < ngx.time() then
        return {
            success = false,
            status = 401,
            error = "API key has expired"
        }
    end
    
    return {
        success = true,
        user = {
            user_id = key_info.user_id,
            username = key_info.username,
            role = key_info.role or "api_user",
            permissions = key_info.permissions or {},
            api_key_id = key_info.key_id
        }
    }
end

return auth

5. 限流与熔断

5.1 限流实现

-- gateway/rate_limiter.lua - 限流模块
local rate_limiter = {}
local redis = require "resty.redis"

-- 限流配置
local rate_limit_config = {
    default_requests_per_minute = 100,
    default_burst = 20,
    window_size = 60,  -- 60秒窗口
    cleanup_interval = 300  -- 5分钟清理一次过期数据
}

-- Redis连接
local function get_redis()
    local red = redis:new()
    red:set_timeout(1000)
    
    local ok, err = red:connect("127.0.0.1", 6379)
    if not ok then
        ngx.log(ngx.ERR, "Failed to connect to Redis: ", err)
        return nil
    end
    
    return red
end

-- 滑动窗口限流算法
local function sliding_window_rate_limit(key, limit, window_size)
    local red = get_redis()
    if not red then
        return true, limit, 0, ngx.time() + window_size  -- Redis不可用时允许通过
    end
    
    local current_time = ngx.time()
    local window_start = current_time - window_size
    
    -- 使用Lua脚本确保原子性
    local script = [[
        local key = KEYS[1]
        local current_time = tonumber(ARGV[1])
        local window_start = tonumber(ARGV[2])
        local limit = tonumber(ARGV[3])
        local window_size = tonumber(ARGV[4])
        
        -- 清理过期的记录
        redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
        
        -- 获取当前窗口内的请求数
        local current_requests = redis.call('ZCARD', key)
        
        if current_requests < limit then
            -- 添加当前请求
            redis.call('ZADD', key, current_time, current_time .. ':' .. math.random(10000, 99999))
            redis.call('EXPIRE', key, window_size * 2)
            return {1, current_requests + 1, limit - current_requests - 1, current_time + window_size}
        else
            return {0, current_requests, 0, current_time + window_size}
        end
    ]]
    
    local result = red:eval(script, 1, key, current_time, window_start, limit, window_size)
    red:set_keepalive(10000, 100)
    
    if result and #result == 4 then
        return result[1] == 1, result[2], result[3], result[4]
    end
    
    return true, limit, 0, current_time + window_size
end

-- 令牌桶限流算法
local function token_bucket_rate_limit(key, limit, burst, refill_rate)
    local red = get_redis()
    if not red then
        return true, limit, burst, ngx.time() + 60
    end
    
    local current_time = ngx.time()
    
    local script = [[
        local key = KEYS[1]
        local current_time = tonumber(ARGV[1])
        local limit = tonumber(ARGV[2])
        local burst = tonumber(ARGV[3])
        local refill_rate = tonumber(ARGV[4])
        
        local bucket_data = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket_data[1]) or burst
        local last_refill = tonumber(bucket_data[2]) or current_time
        
        -- 计算需要添加的令牌数
        local time_passed = current_time - last_refill
        local tokens_to_add = math.floor(time_passed * refill_rate / 60)  -- 每分钟refill_rate个令牌
        tokens = math.min(burst, tokens + tokens_to_add)
        
        if tokens >= 1 then
            tokens = tokens - 1
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', current_time)
            redis.call('EXPIRE', key, 3600)  -- 1小时过期
            return {1, limit, tokens, current_time + 60}
        else
            redis.call('HMSET', key, 'tokens', tokens, 'last_refill', current_time)
            redis.call('EXPIRE', key, 3600)
            return {0, limit, 0, current_time + math.ceil(60 / refill_rate)}
        end
    ]]
    
    local result = red:eval(script, 1, key, current_time, limit, burst, refill_rate)
    red:set_keepalive(10000, 100)
    
    if result and #result == 4 then
        return result[1] == 1, result[2], result[3], result[4]
    end
    
    return true, limit, burst, current_time + 60
end

-- 检查限流
function rate_limiter.check_rate_limit(user, route)
    local limit_config = route.rate_limit or {
        requests_per_minute = rate_limit_config.default_requests_per_minute,
        burst = rate_limit_config.default_burst
    }
    
    local limit = limit_config.requests_per_minute
    local burst = limit_config.burst or limit
    
    -- 生成限流键
    local rate_limit_key
    if user then
        rate_limit_key = string.format("rate_limit:user:%s:%s", user.user_id, route.id)
    else
        rate_limit_key = string.format("rate_limit:ip:%s:%s", ngx.var.remote_addr, route.id)
    end
    
    -- 使用滑动窗口算法
    local allowed, current_requests, remaining, reset_time = 
        sliding_window_rate_limit(rate_limit_key, limit, rate_limit_config.window_size)
    
    return {
        allowed = allowed,
        limit = limit,
        current = current_requests,
        remaining = remaining,
        reset_time = reset_time
    }
end

-- 全局限流检查
function rate_limiter.check_global_rate_limit()
    local global_limit = 10000  -- 每分钟10000个请求
    local global_key = "rate_limit:global"
    
    local allowed, current_requests, remaining, reset_time = 
        sliding_window_rate_limit(global_key, global_limit, rate_limit_config.window_size)
    
    return {
        allowed = allowed,
        limit = global_limit,
        current = current_requests,
        remaining = remaining,
        reset_time = reset_time
    }
end

-- IP限流检查
function rate_limiter.check_ip_rate_limit()
    local ip_limit = 200  -- 每个IP每分钟200个请求
    local ip_key = "rate_limit:ip:" .. ngx.var.remote_addr
    
    local allowed, current_requests, remaining, reset_time = 
        sliding_window_rate_limit(ip_key, ip_limit, rate_limit_config.window_size)
    
    return {
        allowed = allowed,
        limit = ip_limit,
        current = current_requests,
        remaining = remaining,
        reset_time = reset_time
    }
end

-- 获取限流统计信息
function rate_limiter.get_rate_limit_stats()
    local red = get_redis()
    if not red then
        return {error = "Redis connection failed"}
    end
    
    local stats = {
        timestamp = ngx.time(),
        global = {},
        users = {},
        ips = {}
    }
    
    -- 获取全局限流统计
    local global_count = red:zcard("rate_limit:global")
    stats.global.current_requests = global_count or 0
    
    -- 获取用户限流统计(示例)
    local user_keys = red:keys("rate_limit:user:*")
    if user_keys and #user_keys > 0 then
        for _, key in ipairs(user_keys) do
            local user_id = string.match(key, "rate_limit:user:([^:]+):")
            if user_id then
                local count = red:zcard(key)
                stats.users[user_id] = count or 0
            end
        end
    end
    
    red:set_keepalive(10000, 100)
    return stats
end

return rate_limiter

5.2 熔断器实现

-- gateway/circuit_breaker.lua - 熔断器模块
local circuit_breaker = {}
local cjson = require "cjson"

-- 熔断器配置
local circuit_config = {
    failure_threshold = 5,      -- 失败阈值
    success_threshold = 3,      -- 恢复阈值
    timeout = 60,              -- 熔断超时时间(秒)
    window_size = 60,          -- 统计窗口大小(秒)
    min_requests = 10          -- 最小请求数
}

-- 熔断器状态
local CIRCUIT_STATES = {
    CLOSED = "closed",      -- 关闭状态(正常)
    OPEN = "open",          -- 开启状态(熔断)
    HALF_OPEN = "half_open" -- 半开状态(试探)
}

-- 获取熔断器状态
local function get_circuit_state(service_name)
    local cache = ngx.shared.circuit_breaker
    if not cache then
        return CIRCUIT_STATES.CLOSED
    end
    
    local state_data = cache:get(service_name .. ":state")
    if state_data then
        local state_info = cjson.decode(state_data)
        
        -- 检查熔断器是否应该从OPEN状态转换到HALF_OPEN状态
        if state_info.state == CIRCUIT_STATES.OPEN then
            local current_time = ngx.time()
            if current_time >= state_info.open_time + circuit_config.timeout then
                -- 转换到半开状态
                state_info.state = CIRCUIT_STATES.HALF_OPEN
                state_info.half_open_time = current_time
                cache:set(service_name .. ":state", cjson.encode(state_info), 3600)
                
                ngx.log(ngx.INFO, "Circuit breaker for ", service_name, " changed to HALF_OPEN")
            end
        end
        
        return state_info.state
    end
    
    return CIRCUIT_STATES.CLOSED
end

-- 设置熔断器状态
local function set_circuit_state(service_name, state)
    local cache = ngx.shared.circuit_breaker
    if not cache then
        return
    end
    
    local current_time = ngx.time()
    local state_info = {
        state = state,
        timestamp = current_time
    }
    
    if state == CIRCUIT_STATES.OPEN then
        state_info.open_time = current_time
    elseif state == CIRCUIT_STATES.HALF_OPEN then
        state_info.half_open_time = current_time
    end
    
    cache:set(service_name .. ":state", cjson.encode(state_info), 3600)
    
    ngx.log(ngx.WARN, "Circuit breaker for ", service_name, " changed to ", state)
end

-- 记录请求结果
local function record_request_result(service_name, success, response_time)
    local cache = ngx.shared.circuit_breaker
    if not cache then
        return
    end
    
    local current_time = ngx.time()
    local window_start = current_time - circuit_config.window_size
    
    -- 获取当前统计数据
    local stats_key = service_name .. ":stats"
    local stats_data = cache:get(stats_key)
    local stats = stats_data and cjson.decode(stats_data) or {
        requests = {},
        total_requests = 0,
        failed_requests = 0,
        total_response_time = 0
    }
    
    -- 清理过期的请求记录
    local new_requests = {}
    for _, request in ipairs(stats.requests) do
        if request.timestamp >= window_start then
            table.insert(new_requests, request)
        else
            stats.total_requests = stats.total_requests - 1
            if not request.success then
                stats.failed_requests = stats.failed_requests - 1
            end
            stats.total_response_time = stats.total_response_time - request.response_time
        end
    end
    
    -- 添加新的请求记录
    table.insert(new_requests, {
        timestamp = current_time,
        success = success,
        response_time = response_time
    })
    
    stats.requests = new_requests
    stats.total_requests = stats.total_requests + 1
    stats.total_response_time = stats.total_response_time + response_time
    
    if not success then
        stats.failed_requests = stats.failed_requests + 1
    end
    
    cache:set(stats_key, cjson.encode(stats), circuit_config.window_size * 2)
    
    return stats
end

-- 检查是否应该触发熔断
local function should_trip_circuit(stats)
    if stats.total_requests < circuit_config.min_requests then
        return false
    end
    
    local failure_rate = stats.failed_requests / stats.total_requests
    local failure_threshold_rate = circuit_config.failure_threshold / 10  -- 转换为比率
    
    return failure_rate >= failure_threshold_rate
end

-- 检查熔断器状态
function circuit_breaker.check_circuit(service_name)
    local state = get_circuit_state(service_name)
    
    return {
        state = state,
        service = service_name,
        timestamp = ngx.time()
    }
end

-- 记录请求结果
function circuit_breaker.record_result(service_name, success, response_time)
    local state = get_circuit_state(service_name)
    
    -- 记录统计数据
    local stats = record_request_result(service_name, success, response_time or 0)
    
    if state == CIRCUIT_STATES.CLOSED then
        -- 关闭状态:检查是否应该熔断
        if should_trip_circuit(stats) then
            set_circuit_state(service_name, CIRCUIT_STATES.OPEN)
        end
    elseif state == CIRCUIT_STATES.HALF_OPEN then
        -- 半开状态:根据结果决定下一步
        if success then
            -- 成功:检查是否达到恢复阈值
            local recent_successes = 0
            local current_time = ngx.time()
            
            for _, request in ipairs(stats.requests) do
                if request.timestamp >= current_time - 30 and request.success then  -- 最近30秒的成功请求
                    recent_successes = recent_successes + 1
                end
            end
            
            if recent_successes >= circuit_config.success_threshold then
                set_circuit_state(service_name, CIRCUIT_STATES.CLOSED)
            end
        else
            -- 失败:重新熔断
            set_circuit_state(service_name, CIRCUIT_STATES.OPEN)
        end
    end
end

-- 获取熔断器统计信息
function circuit_breaker.get_circuit_stats(service_name)
    local cache = ngx.shared.circuit_breaker
    if not cache then
        return {error = "Circuit breaker cache not available"}
    end
    
    local state = get_circuit_state(service_name)
    local stats_data = cache:get(service_name .. ":stats")
    local stats = stats_data and cjson.decode(stats_data) or {
        total_requests = 0,
        failed_requests = 0,
        total_response_time = 0
    }
    
    local failure_rate = stats.total_requests > 0 and 
                        (stats.failed_requests / stats.total_requests) or 0
    local avg_response_time = stats.total_requests > 0 and 
                        (stats.total_response_time / stats.total_requests) or 0
    
    return {
        service = service_name,
        state = state,
        total_requests = stats.total_requests,
        failed_requests = stats.failed_requests,
        success_requests = stats.total_requests - stats.failed_requests,
        failure_rate = failure_rate,
        avg_response_time = avg_response_time,
        timestamp = ngx.time()
    }
end

-- 获取所有服务的熔断器状态
function circuit_breaker.get_all_circuits_stats()
    local cache = ngx.shared.circuit_breaker
    if not cache then
        return {error = "Circuit breaker cache not available"}
    end
    
    local all_stats = {}
    local services = {"user_service", "order_service", "payment_service"}  -- 可以动态获取
    
    for _, service_name in ipairs(services) do
        all_stats[service_name] = circuit_breaker.get_circuit_stats(service_name)
    end
    
    return all_stats
end

-- 手动重置熔断器
function circuit_breaker.reset_circuit(service_name)
    local cache = ngx.shared.circuit_breaker
    if not cache then
        return false, "Circuit breaker cache not available"
    end
    
    cache:delete(service_name .. ":state")
    cache:delete(service_name .. ":stats")
    
    ngx.log(ngx.INFO, "Circuit breaker for ", service_name, " has been reset")
    return true, "Circuit breaker reset successfully"
end

return circuit_breaker

6. 监控与管理

6.1 监控指标收集

-- gateway/monitor.lua - 监控模块
local monitor = {}
local cjson = require "cjson"
local http = require "resty.http"

-- 监控配置
local monitor_config = {
    metrics_interval = 60,     -- 指标收集间隔(秒)
    retention_period = 3600,   -- 数据保留时间(秒)
    prometheus_enabled = true,
    prometheus_url = "http://localhost:9090"
}

-- 记录请求开始
function monitor.record_request_start()
    local cache = ngx.shared.gateway_cache
    if not cache then
        return
    end
    
    -- 增加总请求计数
    local total_requests = cache:incr("metrics:total_requests", 1, 0)
    
    -- 按服务统计
    local service = ngx.var.service_name
    if service then
        cache:incr("metrics:service:" .. service .. ":requests", 1, 0)
    end
    
    -- 按状态码统计(在请求结束时更新)
    ngx.ctx.request_start_time = ngx.now()
end

-- 记录请求结束
function monitor.record_request_end(request_info)
    local cache = ngx.shared.gateway_cache
    if not cache then
        return
    end
    
    local service = request_info.service
    local status = request_info.status
    local response_time = request_info.response_time
    
    -- 按状态码统计
    cache:incr("metrics:status:" .. status, 1, 0)
    
    if service then
        cache:incr("metrics:service:" .. service .. ":status:" .. status, 1, 0)
        
        -- 响应时间统计
        local response_time_key = "metrics:service:" .. service .. ":response_time"
        local current_total = cache:get(response_time_key .. ":total") or 0
        local current_count = cache:get(response_time_key .. ":count") or 0
        
        cache:set(response_time_key .. ":total", current_total + response_time, 3600)
        cache:set(response_time_key .. ":count", current_count + 1, 3600)
        
        -- 记录错误率
        if status >= 400 then
            cache:incr("metrics:service:" .. service .. ":errors", 1, 0)
        end
    end
    
    -- 用户请求统计
    if request_info.user_id then
        cache:incr("metrics:user:" .. request_info.user_id .. ":requests", 1, 0)
    end
end

-- 获取监控指标
function monitor.get_metrics()
    local cache = ngx.shared.gateway_cache
    if not cache then
        return {error = "Cache not available"}
    end
    
    local metrics = {
        timestamp = ngx.time(),
        total_requests = cache:get("metrics:total_requests") or 0,
        status_codes = {},
        services = {},
        response_times = {}
    }
    
    -- 状态码统计
    local status_codes = {200, 201, 400, 401, 403, 404, 429, 500, 502, 503}
    for _, status in ipairs(status_codes) do
        metrics.status_codes[tostring(status)] = cache:get("metrics:status:" .. status) or 0
    end
    
    -- 服务统计
    local services = {"user_service", "order_service", "payment_service"}
    for _, service in ipairs(services) do
        local service_requests = cache:get("metrics:service:" .. service .. ":requests") or 0
        local service_errors = cache:get("metrics:service:" .. service .. ":errors") or 0
        local response_time_total = cache:get("metrics:service:" .. service .. ":response_time:total") or 0
        local response_time_count = cache:get("metrics:service:" .. service .. ":response_time:count") or 0
        
        metrics.services[service] = {
            requests = service_requests,
            errors = service_errors,
            error_rate = service_requests > 0 and (service_errors / service_requests) or 0,
            avg_response_time = response_time_count > 0 and (response_time_total / response_time_count) or 0
        }
    end
    
    return metrics
end

-- Prometheus指标格式化
function monitor.format_prometheus_metrics()
    local metrics = monitor.get_metrics()
    if metrics.error then
        return nil, metrics.error
    end
    
    local prometheus_output = {}
    
    -- 总请求数
    table.insert(prometheus_output, string.format(
        "# HELP gateway_requests_total Total number of requests\n" ..
        "# TYPE gateway_requests_total counter\n" ..
        "gateway_requests_total %d", metrics.total_requests))
    
    -- 按状态码统计
    table.insert(prometheus_output, 
        "# HELP gateway_requests_by_status_total Requests by status code\n" ..
        "# TYPE gateway_requests_by_status_total counter")
    
    for status, count in pairs(metrics.status_codes) do
        table.insert(prometheus_output, string.format(
            "gateway_requests_by_status_total{status=\"%s\"} %d", status, count))
    end
    
    -- 按服务统计
    table.insert(prometheus_output,
        "# HELP gateway_service_requests_total Requests by service\n" ..
        "# TYPE gateway_service_requests_total counter")
    
    table.insert(prometheus_output,
        "# HELP gateway_service_errors_total Errors by service\n" ..
        "# TYPE gateway_service_errors_total counter")
    
    table.insert(prometheus_output,
        "# HELP gateway_service_response_time_avg Average response time by service\n" ..
        "# TYPE gateway_service_response_time_avg gauge")
    
    for service, stats in pairs(metrics.services) do
        table.insert(prometheus_output, string.format(
            "gateway_service_requests_total{service=\"%s\"} %d", service, stats.requests))
        table.insert(prometheus_output, string.format(
            "gateway_service_errors_total{service=\"%s\"} %d", service, stats.errors))
        table.insert(prometheus_output, string.format(
            "gateway_service_response_time_avg{service=\"%s\"} %.2f", service, stats.avg_response_time))
    end
    
    return table.concat(prometheus_output, "\n")
end

return monitor

6.2 管理接口实现

-- gateway/admin.lua - 管理接口模块
local admin = {}
local cjson = require "cjson"
local router = require "gateway.router"
local circuit_breaker = require "gateway.circuit_breaker"
local rate_limiter = require "gateway.rate_limiter"
local monitor = require "gateway.monitor"
local service_discovery = require "gateway.service_discovery"

-- 处理管理请求
function admin.handle_request()
    local method = ngx.var.request_method
    local uri = ngx.var.uri
    
    ngx.header.content_type = "application/json"
    
    -- 路由分发
    if string.match(uri, "^/admin/routes") then
        admin.handle_routes_request(method, uri)
    elseif string.match(uri, "^/admin/services") then
        admin.handle_services_request(method, uri)
    elseif string.match(uri, "^/admin/circuit-breakers") then
        admin.handle_circuit_breakers_request(method, uri)
    elseif string.match(uri, "^/admin/rate-limits") then
        admin.handle_rate_limits_request(method, uri)
    elseif string.match(uri, "^/admin/metrics") then
        admin.handle_metrics_request(method, uri)
    elseif string.match(uri, "^/admin/health") then
        admin.handle_health_request(method, uri)
    else
        ngx.status = 404
        ngx.say(cjson.encode({error = "Endpoint not found"}))
    end
end

-- 路由管理
function admin.handle_routes_request(method, uri)
    if method == "GET" then
        if string.match(uri, "/admin/routes$") then
            -- 获取所有路由
            local routes = router.get_all_routes()
            ngx.say(cjson.encode({routes = routes}))
        else
            -- 获取特定路由
            local route_id = string.match(uri, "/admin/routes/([^/]+)$")
            if route_id then
                local route = router.get_route_config(route_id)
                if route then
                    ngx.say(cjson.encode({route = route}))
                else
                    ngx.status = 404
                    ngx.say(cjson.encode({error = "Route not found"}))
                end
            else
                ngx.status = 400
                ngx.say(cjson.encode({error = "Invalid route ID"}))
            end
        end
    elseif method == "POST" then
        -- 添加新路由
        ngx.req.read_body()
        local body = ngx.req.get_body_data()
        
        if not body then
            ngx.status = 400
            ngx.say(cjson.encode({error = "Request body required"}))
            return
        end
        
        local success, route_config = pcall(cjson.decode, body)
        if not success then
            ngx.status = 400
            ngx.say(cjson.encode({error = "Invalid JSON"}))
            return
        end
        
        local ok, message = router.add_route(route_config)
        if ok then
            ngx.status = 201
            ngx.say(cjson.encode({message = message, route = route_config}))
        else
            ngx.status = 400
            ngx.say(cjson.encode({error = message}))
        end
    elseif method == "DELETE" then
        -- 删除路由
        local route_id = string.match(uri, "/admin/routes/([^/]+)$")
        if route_id then
            local ok, message = router.remove_route(route_id)
            if ok then
                ngx.say(cjson.encode({message = message}))
            else
                ngx.status = 404
                ngx.say(cjson.encode({error = message}))
            end
        else
            ngx.status = 400
            ngx.say(cjson.encode({error = "Route ID required"}))
        end
    else
        ngx.status = 405
        ngx.say(cjson.encode({error = "Method not allowed"}))
    end
end

-- 服务管理
function admin.handle_services_request(method, uri)
    if method == "GET" then
        if string.match(uri, "/admin/services$") then
            -- 获取所有服务统计
            local stats = service_discovery.get_service_stats()
            ngx.say(cjson.encode({services = stats}))
        else
            -- 获取特定服务实例
            local service_name = string.match(uri, "/admin/services/([^/]+)$")
            if service_name then
                local instances = service_discovery.get_service_instances(service_name)
                ngx.say(cjson.encode({service = service_name, instances = instances}))
            else
                ngx.status = 400
                ngx.say(cjson.encode({error = "Invalid service name"}))
            end
        end
    elseif method == "POST" then
        -- 注册服务实例
        local service_name = string.match(uri, "/admin/services/([^/]+)/instances$")
        if not service_name then
            ngx.status = 400
            ngx.say(cjson.encode({error = "Service name required"}))
            return
        end
        
        ngx.req.read_body()
        local body = ngx.req.get_body_data()
        
        if not body then
            ngx.status = 400
            ngx.say(cjson.encode({error = "Request body required"}))
            return
        end
        
        local success, instance = pcall(cjson.decode, body)
        if not success then
            ngx.status = 400
            ngx.say(cjson.encode({error = "Invalid JSON"}))
            return
        end
        
        local ok = service_discovery.register_service(service_name, instance)
        if ok then
            ngx.status = 201
            ngx.say(cjson.encode({message = "Service instance registered", instance = instance}))
        else
            ngx.status = 500
            ngx.say(cjson.encode({error = "Failed to register service instance"}))
        end
    else
        ngx.status = 405
        ngx.say(cjson.encode({error = "Method not allowed"}))
    end
end

-- 熔断器管理
function admin.handle_circuit_breakers_request(method, uri)
    if method == "GET" then
        if string.match(uri, "/admin/circuit-breakers$") then
            -- 获取所有熔断器状态
            local stats = circuit_breaker.get_all_circuits_stats()
            ngx.say(cjson.encode({circuit_breakers = stats}))
        else
            -- 获取特定服务的熔断器状态
            local service_name = string.match(uri, "/admin/circuit-breakers/([^/]+)$")
            if service_name then
                local stats = circuit_breaker.get_circuit_stats(service_name)
                ngx.say(cjson.encode({circuit_breaker = stats}))
            else
                ngx.status = 400
                ngx.say(cjson.encode({error = "Invalid service name"}))
            end
        end
    elseif method == "POST" then
        -- 重置熔断器
        local service_name = string.match(uri, "/admin/circuit-breakers/([^/]+)/reset$")
        if service_name then
            local ok, message = circuit_breaker.reset_circuit(service_name)
            if ok then
                ngx.say(cjson.encode({message = message}))
            else
                ngx.status = 500
                ngx.say(cjson.encode({error = message}))
            end
        else
            ngx.status = 400
            ngx.say(cjson.encode({error = "Service name required"}))
        end
    else
        ngx.status = 405
        ngx.say(cjson.encode({error = "Method not allowed"}))
    end
end

-- 限流管理
function admin.handle_rate_limits_request(method, uri)
    if method == "GET" then
        local stats = rate_limiter.get_rate_limit_stats()
        ngx.say(cjson.encode({rate_limits = stats}))
    else
        ngx.status = 405
        ngx.say(cjson.encode({error = "Method not allowed"}))
    end
end

-- 监控指标
function admin.handle_metrics_request(method, uri)
    if method == "GET" then
        if string.match(uri, "/admin/metrics/prometheus$") then
            -- Prometheus格式
            local prometheus_metrics, err = monitor.format_prometheus_metrics()
            if prometheus_metrics then
                ngx.header.content_type = "text/plain"
                ngx.say(prometheus_metrics)
            else
                ngx.status = 500
                ngx.header.content_type = "application/json"
                ngx.say(cjson.encode({error = err or "Failed to generate metrics"}))
            end
        else
            -- JSON格式
            local metrics = monitor.get_metrics()
            ngx.say(cjson.encode({metrics = metrics}))
        end
    else
        ngx.status = 405
        ngx.say(cjson.encode({error = "Method not allowed"}))
    end
end

-- 健康检查
function admin.handle_health_request(method, uri)
    if method == "GET" then
        local health_status = {
            status = "healthy",
            timestamp = ngx.time(),
            version = "1.0.0",
            services = service_discovery.get_service_stats(),
            circuit_breakers = circuit_breaker.get_all_circuits_stats()
        }
        
        ngx.say(cjson.encode(health_status))
    else
        ngx.status = 405
        ngx.say(cjson.encode({error = "Method not allowed"}))
    end
end

return admin

7. 部署与最佳实践

7.1 生产环境部署

# 生产环境部署脚本
#!/bin/bash

# 创建目录结构
mkdir -p /opt/gateway/{conf,logs,lua,ssl}
mkdir -p /opt/gateway/lua/{gateway,vendor}

# 复制配置文件
cp nginx.conf /opt/gateway/conf/
cp -r lua/gateway/* /opt/gateway/lua/gateway/

# 设置权限
chown -R nginx:nginx /opt/gateway
chmod -R 755 /opt/gateway

# 启动服务
systemctl start openresty
systemctl enable openresty

7.2 性能优化建议

  • 连接池优化:合理配置upstream连接池大小
  • 缓存策略:充分利用共享内存和Redis缓存
  • 负载均衡:选择合适的负载均衡算法
  • 监控告警:建立完善的监控和告警机制
  • 安全防护:实施多层安全防护措施

7.3 故障排查

  • 日志分析:通过日志快速定位问题
  • 性能监控:实时监控关键指标
  • 熔断恢复:及时处理服务熔断
  • 限流调整:根据业务需求调整限流策略

总结

OpenResty作为API网关在微服务架构中发挥着重要作用,提供了:

  1. 统一入口:所有外部请求的统一接入点
  2. 服务治理:路由、负载均衡、服务发现
  3. 安全控制:认证、授权、限流、防护
  4. 监控运维:日志、监控、管理接口
  5. 高性能:基于Nginx的高并发处理能力

通过合理的架构设计和配置优化,OpenResty可以构建出高性能、高可用的微服务API网关系统。