1. 开发最佳实践
1.1 代码组织与架构
1.1.1 模块化设计
-- 推荐的项目结构
/opt/openresty-app/
├── lua/
│ ├── app/
│ │ ├── init.lua -- 应用初始化
│ │ ├── config/
│ │ │ ├── init.lua -- 配置管理
│ │ │ ├── database.lua -- 数据库配置
│ │ │ └── redis.lua -- Redis配置
│ │ ├── models/
│ │ │ ├── base.lua -- 基础模型
│ │ │ ├── user.lua -- 用户模型
│ │ │ └── product.lua -- 产品模型
│ │ ├── controllers/
│ │ │ ├── base.lua -- 基础控制器
│ │ │ ├── auth.lua -- 认证控制器
│ │ │ └── api.lua -- API控制器
│ │ ├── middleware/
│ │ │ ├── auth.lua -- 认证中间件
│ │ │ ├── rate_limit.lua -- 限流中间件
│ │ │ └── cors.lua -- CORS中间件
│ │ ├── services/
│ │ │ ├── user_service.lua -- 用户服务
│ │ │ └── email_service.lua -- 邮件服务
│ │ └── utils/
│ │ ├── logger.lua -- 日志工具
│ │ ├── validator.lua -- 验证工具
│ │ └── crypto.lua -- 加密工具
│ └── vendor/ -- 第三方库
├── conf/
│ ├── nginx.conf -- Nginx主配置
│ ├── mime.types -- MIME类型
│ └── ssl/ -- SSL证书
└── logs/ -- 日志目录
1.1.2 基础架构模块
-- lua/app/init.lua - 应用初始化
local app = {}
local config = require "app.config"
local logger = require "app.utils.logger"
-- 应用状态
local app_state = {
initialized = false,
start_time = ngx.time(),
version = "1.0.0"
}
-- 初始化应用
function app.init()
if app_state.initialized then
return true
end
-- 初始化配置
local ok, err = config.init()
if not ok then
logger.error("Failed to initialize config: ", err)
return false, err
end
-- 初始化数据库连接池
local db_config = config.get("database")
if db_config then
local db = require "app.models.base"
local ok, err = db.init_pool(db_config)
if not ok then
logger.error("Failed to initialize database pool: ", err)
return false, err
end
end
-- 初始化Redis连接池
local redis_config = config.get("redis")
if redis_config then
local redis = require "resty.redis"
-- Redis连接池初始化逻辑
end
-- 设置全局错误处理
app.setup_error_handler()
app_state.initialized = true
logger.info("Application initialized successfully")
return true
end
-- 设置错误处理
function app.setup_error_handler()
-- 设置Lua错误处理
local old_error = _G.error
_G.error = function(msg, level)
logger.error("Lua error: ", msg)
-- 记录错误到监控系统
local prometheus = require "app.utils.prometheus"
prometheus.record_error("lua_error", "critical")
return old_error(msg, level)
end
end
-- 获取应用状态
function app.get_state()
return app_state
end
-- 优雅关闭
function app.shutdown()
logger.info("Application shutting down...")
-- 关闭数据库连接池
local db = require "app.models.base"
db.close_pool()
-- 清理缓存
ngx.shared.cache:flush_all()
app_state.initialized = false
logger.info("Application shutdown completed")
end
return app
1.1.3 基础控制器模式
-- lua/app/controllers/base.lua - 基础控制器
local base_controller = {}
local cjson = require "cjson"
local logger = require "app.utils.logger"
local validator = require "app.utils.validator"
-- 响应状态码
base_controller.HTTP_STATUS = {
OK = 200,
CREATED = 201,
BAD_REQUEST = 400,
UNAUTHORIZED = 401,
FORBIDDEN = 403,
NOT_FOUND = 404,
INTERNAL_ERROR = 500
}
-- 创建控制器实例
function base_controller:new()
local instance = {
request = {
method = ngx.var.request_method,
uri = ngx.var.request_uri,
args = ngx.req.get_uri_args(),
headers = ngx.req.get_headers(),
body = nil
},
response = {
status = 200,
headers = {},
body = nil
}
}
setmetatable(instance, {__index = self})
return instance
end
-- 获取请求体
function base_controller:get_request_body()
if self.request.body then
return self.request.body
end
ngx.req.read_body()
local body_data = ngx.req.get_body_data()
if not body_data then
local body_file = ngx.req.get_body_file()
if body_file then
local file = io.open(body_file, "r")
if file then
body_data = file:read("*all")
file:close()
end
end
end
self.request.body = body_data
return body_data
end
-- 解析JSON请求体
function base_controller:get_json_body()
local body = self:get_request_body()
if not body then
return nil, "No request body"
end
local success, data = pcall(cjson.decode, body)
if not success then
return nil, "Invalid JSON format"
end
return data
end
-- 验证请求参数
function base_controller:validate_params(rules)
local params = {}
-- 合并GET和POST参数
for k, v in pairs(self.request.args) do
params[k] = v
end
local json_body = self:get_json_body()
if json_body then
for k, v in pairs(json_body) do
params[k] = v
end
end
return validator.validate(params, rules)
end
-- 设置响应头
function base_controller:set_header(name, value)
self.response.headers[name] = value
end
-- JSON响应
function base_controller:json_response(data, status)
status = status or self.HTTP_STATUS.OK
self:set_header("Content-Type", "application/json; charset=utf-8")
local response_body = {
success = status < 400,
data = data,
timestamp = ngx.time()
}
ngx.status = status
-- 设置响应头
for name, value in pairs(self.response.headers) do
ngx.header[name] = value
end
ngx.say(cjson.encode(response_body))
end
-- 错误响应
function base_controller:error_response(message, status, error_code)
status = status or self.HTTP_STATUS.INTERNAL_ERROR
local error_data = {
error = {
message = message,
code = error_code,
request_id = ngx.var.request_id
}
}
-- 记录错误日志
logger.error("Controller error: ", message, ", status: ", status)
self:json_response(error_data, status)
end
-- 成功响应
function base_controller:success_response(data, message)
local response_data = {
message = message or "Success",
data = data
}
self:json_response(response_data, self.HTTP_STATUS.OK)
end
-- 分页响应
function base_controller:paginated_response(items, total, page, per_page)
local response_data = {
items = items,
pagination = {
total = total,
page = page,
per_page = per_page,
total_pages = math.ceil(total / per_page)
}
}
self:json_response(response_data)
end
-- 执行中间件
function base_controller:run_middleware(middlewares)
for _, middleware in ipairs(middlewares) do
local ok, err = middleware(self)
if not ok then
return false, err
end
end
return true
end
return base_controller
1.2 性能优化实践
1.2.1 连接池优化
-- lua/app/utils/connection_pool.lua - 连接池管理
local connection_pool = {}
local logger = require "app.utils.logger"
-- 连接池配置
local pool_configs = {
mysql = {
max_idle_timeout = 10000, -- 10秒
pool_size = 100,
backlog = 200
},
redis = {
max_idle_timeout = 30000, -- 30秒
pool_size = 200,
backlog = 300
},
http = {
max_idle_timeout = 60000, -- 60秒
pool_size = 50,
backlog = 100
}
}
-- 获取连接池配置
function connection_pool.get_pool_config(pool_type)
return pool_configs[pool_type] or pool_configs.mysql
end
-- 设置连接池
function connection_pool.set_keepalive(connection, pool_type, pool_name)
local config = connection_pool.get_pool_config(pool_type)
local ok, err = connection:set_keepalive(
config.max_idle_timeout,
config.pool_size
)
if not ok then
logger.warn("Failed to set keepalive for ", pool_type, ": ", err)
return false
end
return true
end
-- 连接池监控
function connection_pool.get_pool_stats()
local stats = {}
-- 这里需要根据实际的连接池实现来获取统计信息
-- 示例统计信息
stats.mysql = {
active_connections = 0,
idle_connections = 0,
total_connections = 0
}
return stats
end
return connection_pool
1.2.2 缓存策略优化
-- lua/app/utils/cache_manager.lua - 缓存管理器
local cache_manager = {}
local cjson = require "cjson"
local logger = require "app.utils.logger"
-- 缓存层级
local CACHE_LEVELS = {
L1 = "lua_shared", -- Lua共享内存
L2 = "redis", -- Redis缓存
L3 = "database" -- 数据库
}
-- 缓存配置
local cache_configs = {
user_profile = {
ttl = 3600, -- 1小时
levels = {"L1", "L2"},
serialize = true
},
session = {
ttl = 1800, -- 30分钟
levels = {"L1", "L2"},
serialize = true
},
api_response = {
ttl = 300, -- 5分钟
levels = {"L1"},
serialize = true
},
static_config = {
ttl = 86400, -- 24小时
levels = {"L1", "L2"},
serialize = true
}
}
-- 获取缓存配置
function cache_manager.get_config(cache_type)
return cache_configs[cache_type] or cache_configs.api_response
end
-- 生成缓存键
function cache_manager.generate_key(prefix, ...)
local parts = {prefix}
for _, part in ipairs({...}) do
table.insert(parts, tostring(part))
end
return table.concat(parts, ":")
end
-- 从L1缓存获取
function cache_manager.get_from_l1(key)
local shared_dict = ngx.shared.cache
if not shared_dict then
return nil
end
local value, flags = shared_dict:get(key)
if value then
logger.debug("L1 cache hit: ", key)
return value, true
end
return nil, false
end
-- 设置L1缓存
function cache_manager.set_to_l1(key, value, ttl)
local shared_dict = ngx.shared.cache
if not shared_dict then
return false
end
local success, err, forcible = shared_dict:set(key, value, ttl)
if not success then
logger.warn("Failed to set L1 cache: ", key, ", error: ", err)
return false
end
if forcible then
logger.debug("L1 cache forcibly set (evicted old data): ", key)
end
return true
end
-- 从L2缓存获取
function cache_manager.get_from_l2(key)
local redis = require "resty.redis"
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
logger.warn("Failed to connect to Redis: ", err)
return nil, false
end
local value, err = red:get(key)
red:set_keepalive(10000, 100)
if value and value ~= ngx.null then
logger.debug("L2 cache hit: ", key)
return value, true
end
return nil, false
end
-- 设置L2缓存
function cache_manager.set_to_l2(key, value, ttl)
local redis = require "resty.redis"
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
logger.warn("Failed to connect to Redis: ", err)
return false
end
local ok, err = red:setex(key, ttl, value)
red:set_keepalive(10000, 100)
if not ok then
logger.warn("Failed to set L2 cache: ", key, ", error: ", err)
return false
end
return true
end
-- 多级缓存获取
function cache_manager.get(cache_type, key, ...)
local full_key = cache_manager.generate_key(cache_type, key, ...)
local config = cache_manager.get_config(cache_type)
-- 尝试从各级缓存获取
for _, level in ipairs(config.levels) do
local value, hit
if level == "L1" then
value, hit = cache_manager.get_from_l1(full_key)
elseif level == "L2" then
value, hit = cache_manager.get_from_l2(full_key)
end
if hit then
-- 反序列化
if config.serialize and value then
local success, decoded = pcall(cjson.decode, value)
if success then
value = decoded
end
end
-- 回填到更高级的缓存
cache_manager.backfill_cache(cache_type, full_key, value, level)
return value
end
end
return nil
end
-- 多级缓存设置
function cache_manager.set(cache_type, key, value, custom_ttl, ...)
local full_key = cache_manager.generate_key(cache_type, key, ...)
local config = cache_manager.get_config(cache_type)
local ttl = custom_ttl or config.ttl
-- 序列化
local serialized_value = value
if config.serialize then
serialized_value = cjson.encode(value)
end
-- 设置到所有级别的缓存
for _, level in ipairs(config.levels) do
if level == "L1" then
cache_manager.set_to_l1(full_key, serialized_value, ttl)
elseif level == "L2" then
cache_manager.set_to_l2(full_key, serialized_value, ttl)
end
end
return true
end
-- 回填缓存
function cache_manager.backfill_cache(cache_type, key, value, hit_level)
local config = cache_manager.get_config(cache_type)
local serialized_value = value
if config.serialize then
serialized_value = cjson.encode(value)
end
-- 回填到更高级的缓存
for _, level in ipairs(config.levels) do
if level == hit_level then
break
end
if level == "L1" then
cache_manager.set_to_l1(key, serialized_value, config.ttl)
end
end
end
-- 删除缓存
function cache_manager.delete(cache_type, key, ...)
local full_key = cache_manager.generate_key(cache_type, key, ...)
local config = cache_manager.get_config(cache_type)
-- 从所有级别删除
for _, level in ipairs(config.levels) do
if level == "L1" then
local shared_dict = ngx.shared.cache
if shared_dict then
shared_dict:delete(full_key)
end
elseif level == "L2" then
local redis = require "resty.redis"
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if ok then
red:del(full_key)
red:set_keepalive(10000, 100)
end
end
end
end
-- 缓存统计
function cache_manager.get_stats()
local stats = {
l1_stats = {},
l2_stats = {}
}
-- L1缓存统计
local shared_dict = ngx.shared.cache
if shared_dict then
stats.l1_stats = {
capacity = shared_dict:capacity(),
free_space = shared_dict:free_space(),
used_space = shared_dict:capacity() - shared_dict:free_space()
}
end
return stats
end
return cache_manager
2. 生产环境案例分析
2.1 高并发API网关案例
2.1.1 架构设计
┌─────────────────────────────────────────────────────────────┐
│ Load Balancer │
│ (HAProxy/F5) │
└─────────────────┬───────────────────────────────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│ OR-1 │ │ OR-2 │ │ OR-3 │
│Gateway│ │Gateway│ │Gateway│
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
└─────────────┼─────────────┘
│
┌─────────▼─────────┐
│ Service Mesh │
│ (Consul/Etcd) │
└─────────┬─────────┘
│
┌─────────────┼─────────────┐
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│Service│ │Service│ │Service│
│ A │ │ B │ │ C │
└───────┘ └───────┘ └───────┘
2.1.2 核心配置
# nginx.conf - 高并发API网关配置
worker_processes auto;
worker_cpu_affinity auto;
worker_rlimit_nofile 65535;
events {
worker_connections 4096;
use epoll;
multi_accept on;
}
http {
# 基础配置
include mime.types;
default_type application/octet-stream;
# 性能优化
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
keepalive_requests 1000;
# 缓冲区优化
client_body_buffer_size 128k;
client_max_body_size 10m;
client_header_buffer_size 32k;
large_client_header_buffers 4 32k;
# 压缩配置
gzip on;
gzip_vary on;
gzip_min_length 1000;
gzip_comp_level 6;
gzip_types
text/plain
text/css
text/xml
text/javascript
application/json
application/javascript
application/xml+rss
application/atom+xml;
# Lua配置
lua_package_path '/opt/openresty-app/lua/?.lua;/opt/openresty-app/lua/?/init.lua;;';
lua_code_cache on;
lua_shared_dict cache 100m;
lua_shared_dict locks 10m;
lua_shared_dict rate_limit 10m;
lua_shared_dict sessions 50m;
lua_shared_dict metrics 10m;
# 初始化
init_by_lua_block {
local app = require "app.init"
app.init()
}
init_worker_by_lua_block {
local prometheus = require "app.utils.prometheus"
prometheus.init_worker()
local health_check = require "app.utils.health_check"
health_check.start_background_checks()
}
# 上游服务配置
upstream backend_service_a {
least_conn;
server 10.0.1.10:8080 max_fails=3 fail_timeout=30s;
server 10.0.1.11:8080 max_fails=3 fail_timeout=30s;
server 10.0.1.12:8080 max_fails=3 fail_timeout=30s;
keepalive 32;
}
upstream backend_service_b {
least_conn;
server 10.0.2.10:8080 max_fails=3 fail_timeout=30s;
server 10.0.2.11:8080 max_fails=3 fail_timeout=30s;
keepalive 32;
}
# 日志格式
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'$request_time $upstream_response_time';
# 服务器配置
server {
listen 80;
server_name api.example.com;
access_log /var/log/nginx/access.log main;
error_log /var/log/nginx/error.log warn;
# 全局中间件
access_by_lua_block {
local middleware = require "app.middleware.gateway"
middleware.run()
}
# API路由
location ~ ^/api/v1/users/(.*) {
set $service "user_service";
set $path $1;
proxy_pass http://backend_service_a/users/$path$is_args$args;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时配置
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
# 连接池
proxy_http_version 1.1;
proxy_set_header Connection "";
}
location ~ ^/api/v1/products/(.*) {
set $service "product_service";
set $path $1;
proxy_pass http://backend_service_b/products/$path$is_args$args;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
# 健康检查
location /health {
access_log off;
content_by_lua_block {
local health = require "app.utils.health_check"
local result = health.simple_check()
ngx.header.content_type = "application/json"
ngx.say(require("cjson").encode(result))
}
}
# 监控指标
location /metrics {
access_log off;
allow 127.0.0.1;
allow 10.0.0.0/8;
deny all;
content_by_lua_block {
local prometheus = require "app.utils.prometheus"
ngx.header.content_type = "text/plain"
ngx.say(prometheus.collect())
}
}
}
}
2.1.3 网关中间件
-- lua/app/middleware/gateway.lua - 网关中间件
local gateway_middleware = {}
local cjson = require "cjson"
local logger = require "app.utils.logger"
local rate_limiter = require "app.middleware.rate_limit"
local auth = require "app.middleware.auth"
local prometheus = require "app.utils.prometheus"
-- 中间件执行顺序
local middleware_chain = {
"request_id",
"cors",
"rate_limit",
"auth",
"service_discovery",
"circuit_breaker",
"request_transform",
"metrics"
}
-- 生成请求ID
function gateway_middleware.request_id()
local request_id = ngx.var.request_id or ngx.var.connection .. "-" .. ngx.var.connection_requests
ngx.var.request_id = request_id
ngx.header["X-Request-ID"] = request_id
return true
end
-- CORS处理
function gateway_middleware.cors()
local origin = ngx.var.http_origin
if origin then
ngx.header["Access-Control-Allow-Origin"] = origin
ngx.header["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
ngx.header["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Requested-With"
ngx.header["Access-Control-Allow-Credentials"] = "true"
ngx.header["Access-Control-Max-Age"] = "86400"
end
if ngx.var.request_method == "OPTIONS" then
ngx.status = 204
ngx.exit(204)
end
return true
end
-- 限流检查
function gateway_middleware.rate_limit()
local client_ip = ngx.var.remote_addr
local api_key = ngx.var.http_x_api_key
-- 基于IP的限流
local ip_limit_ok, ip_err = rate_limiter.check_rate_limit("ip:" .. client_ip, 100, 60)
if not ip_limit_ok then
logger.warn("IP rate limit exceeded: ", client_ip)
prometheus.record_error("rate_limit", "warning")
ngx.status = 429
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = "Rate limit exceeded",
message = "Too many requests from IP",
retry_after = 60
}))
ngx.exit(429)
end
-- 基于API Key的限流
if api_key then
local key_limit_ok, key_err = rate_limiter.check_rate_limit("key:" .. api_key, 1000, 60)
if not key_limit_ok then
logger.warn("API key rate limit exceeded: ", api_key)
prometheus.record_error("rate_limit", "warning")
ngx.status = 429
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = "Rate limit exceeded",
message = "Too many requests for API key",
retry_after = 60
}))
ngx.exit(429)
end
end
return true
end
-- 认证检查
function gateway_middleware.auth()
local path = ngx.var.uri
-- 公开路径不需要认证
local public_paths = {
"/health",
"/metrics",
"/api/v1/auth/login",
"/api/v1/auth/register"
}
for _, public_path in ipairs(public_paths) do
if path:match("^" .. public_path) then
return true
end
end
-- 执行认证
local auth_ok, auth_err = auth.verify_token()
if not auth_ok then
logger.warn("Authentication failed: ", auth_err)
prometheus.record_error("auth_failed", "warning")
ngx.status = 401
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = "Authentication required",
message = auth_err
}))
ngx.exit(401)
end
return true
end
-- 服务发现
function gateway_middleware.service_discovery()
local service = ngx.var.service
if not service then
return true
end
-- 从服务注册中心获取服务实例
local consul = require "app.utils.consul"
local instances, err = consul.get_healthy_instances(service)
if not instances or #instances == 0 then
logger.error("No healthy instances for service: ", service)
prometheus.record_error("service_unavailable", "critical")
ngx.status = 503
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = "Service unavailable",
message = "No healthy instances available"
}))
ngx.exit(503)
end
-- 选择实例(负载均衡)
local selected_instance = consul.select_instance(instances)
ngx.var.backend_host = selected_instance.host
ngx.var.backend_port = selected_instance.port
return true
end
-- 熔断器
function gateway_middleware.circuit_breaker()
local service = ngx.var.service
if not service then
return true
end
local circuit_breaker = require "app.utils.circuit_breaker"
local state = circuit_breaker.get_state(service)
if state == "open" then
logger.warn("Circuit breaker open for service: ", service)
prometheus.record_error("circuit_breaker_open", "warning")
ngx.status = 503
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = "Service temporarily unavailable",
message = "Circuit breaker is open"
}))
ngx.exit(503)
end
return true
end
-- 请求转换
function gateway_middleware.request_transform()
-- 添加追踪头
ngx.req.set_header("X-Trace-ID", ngx.var.request_id)
ngx.req.set_header("X-Gateway-Time", ngx.time())
-- 移除内部头
ngx.req.clear_header("X-Internal-Token")
return true
end
-- 指标收集
function gateway_middleware.metrics()
local start_time = ngx.now()
ngx.ctx.start_time = start_time
-- 记录请求指标
prometheus.record_request(
ngx.var.request_method,
0, -- 状态码稍后在log阶段记录
ngx.var.uri,
0 -- 持续时间稍后计算
)
return true
end
-- 执行中间件链
function gateway_middleware.run()
for _, middleware_name in ipairs(middleware_chain) do
local middleware_func = gateway_middleware[middleware_name]
if middleware_func then
local ok, err = pcall(middleware_func)
if not ok then
logger.error("Middleware error (", middleware_name, "): ", err)
prometheus.record_error("middleware_error", "critical")
ngx.status = 500
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = "Internal server error",
message = "Middleware processing failed"
}))
ngx.exit(500)
end
end
end
end
return gateway_middleware
2.2 电商平台缓存优化案例
2.2.1 多级缓存架构
-- lua/app/services/product_service.lua - 产品服务缓存优化
local product_service = {}
local cache_manager = require "app.utils.cache_manager"
local mysql = require "resty.mysql"
local cjson = require "cjson"
local logger = require "app.utils.logger"
-- 获取产品详情(多级缓存)
function product_service.get_product_detail(product_id)
-- 1. 尝试从缓存获取
local cached_product = cache_manager.get("product_detail", product_id)
if cached_product then
logger.debug("Product cache hit: ", product_id)
return cached_product
end
-- 2. 从数据库获取
logger.debug("Product cache miss, querying database: ", product_id)
local db = mysql:new()
db:set_timeout(5000)
local ok, err = db:connect({
host = "127.0.0.1",
port = 3306,
database = "ecommerce",
user = "app_user",
password = "app_pass"
})
if not ok then
logger.error("Database connection failed: ", err)
return nil, "Database connection failed"
end
-- 查询产品基本信息
local product_query = [[
SELECT p.id, p.name, p.description, p.price, p.stock_quantity,
p.category_id, p.brand_id, p.status, p.created_at, p.updated_at,
c.name as category_name, b.name as brand_name
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN brands b ON p.brand_id = b.id
WHERE p.id = ? AND p.status = 'active'
]]
local res, err = db:query(product_query, product_id)
if not res or #res == 0 then
db:set_keepalive(10000, 100)
return nil, "Product not found"
end
local product = res[1]
-- 查询产品图片
local images_query = [[
SELECT url, alt_text, sort_order
FROM product_images
WHERE product_id = ?
ORDER BY sort_order
]]
local images_res, err = db:query(images_query, product_id)
product.images = images_res or {}
-- 查询产品属性
local attributes_query = [[
SELECT pa.attribute_name, pa.attribute_value
FROM product_attributes pa
WHERE pa.product_id = ?
]]
local attributes_res, err = db:query(attributes_query, product_id)
product.attributes = {}
if attributes_res then
for _, attr in ipairs(attributes_res) do
product.attributes[attr.attribute_name] = attr.attribute_value
end
end
-- 查询相关产品
local related_query = [[
SELECT rp.related_product_id, p.name, p.price, pi.url as image_url
FROM related_products rp
JOIN products p ON rp.related_product_id = p.id
LEFT JOIN product_images pi ON p.id = pi.product_id AND pi.sort_order = 1
WHERE rp.product_id = ? AND p.status = 'active'
LIMIT 10
]]
local related_res, err = db:query(related_query, product_id)
product.related_products = related_res or {}
db:set_keepalive(10000, 100)
-- 3. 缓存产品信息(不同TTL策略)
local base_ttl = 3600 -- 1小时基础TTL
-- 根据产品热度调整TTL
local popularity_score = product_service.get_popularity_score(product_id)
local ttl = base_ttl
if popularity_score > 1000 then
ttl = base_ttl * 4 -- 热门产品缓存4小时
elseif popularity_score > 100 then
ttl = base_ttl * 2 -- 中等热度产品缓存2小时
end
cache_manager.set("product_detail", product_id, product, ttl)
logger.info("Product cached with TTL ", ttl, ": ", product_id)
return product
end
-- 获取产品热度评分
function product_service.get_popularity_score(product_id)
-- 从Redis获取产品访问统计
local redis = require "resty.redis"
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return 0
end
-- 获取最近24小时的访问次数
local views_key = "product:views:24h:" .. product_id
local views = red:get(views_key) or 0
-- 获取最近7天的销量
local sales_key = "product:sales:7d:" .. product_id
local sales = red:get(sales_key) or 0
red:set_keepalive(10000, 100)
-- 计算热度评分(访问次数 + 销量 * 10)
return tonumber(views) + tonumber(sales) * 10
end
-- 批量获取产品(缓存优化)
function product_service.get_products_batch(product_ids)
local products = {}
local cache_misses = {}
-- 1. 批量检查缓存
for _, product_id in ipairs(product_ids) do
local cached_product = cache_manager.get("product_detail", product_id)
if cached_product then
products[product_id] = cached_product
else
table.insert(cache_misses, product_id)
end
end
-- 2. 批量查询缓存未命中的产品
if #cache_misses > 0 then
local db_products = product_service.query_products_batch(cache_misses)
-- 3. 批量缓存新查询的产品
for product_id, product in pairs(db_products) do
products[product_id] = product
local popularity_score = product_service.get_popularity_score(product_id)
local ttl = 3600
if popularity_score > 1000 then
ttl = 14400
elseif popularity_score > 100 then
ttl = 7200
end
cache_manager.set("product_detail", product_id, product, ttl)
end
end
return products
end
-- 批量数据库查询
function product_service.query_products_batch(product_ids)
if #product_ids == 0 then
return {}
end
local db = mysql:new()
db:set_timeout(5000)
local ok, err = db:connect({
host = "127.0.0.1",
port = 3306,
database = "ecommerce",
user = "app_user",
password = "app_pass"
})
if not ok then
logger.error("Database connection failed: ", err)
return {}
end
-- 构建IN查询
local placeholders = table.concat(string.rep("?,", #product_ids):sub(1, -2))
local query = string.format([[
SELECT p.id, p.name, p.description, p.price, p.stock_quantity,
p.category_id, p.brand_id, p.status, p.created_at, p.updated_at,
c.name as category_name, b.name as brand_name
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN brands b ON p.brand_id = b.id
WHERE p.id IN (%s) AND p.status = 'active'
]], placeholders)
local res, err = db:query(query, unpack(product_ids))
db:set_keepalive(10000, 100)
if not res then
logger.error("Batch query failed: ", err)
return {}
end
-- 组织结果
local products = {}
for _, row in ipairs(res) do
products[tostring(row.id)] = row
end
return products
end
-- 产品搜索(带缓存)
function product_service.search_products(query, filters, page, per_page)
page = page or 1
per_page = per_page or 20
-- 生成缓存键
local cache_key_parts = {query or ""}
if filters then
for k, v in pairs(filters) do
table.insert(cache_key_parts, k .. ":" .. tostring(v))
end
end
table.insert(cache_key_parts, "page:" .. page)
table.insert(cache_key_parts, "per_page:" .. per_page)
local cache_key = table.concat(cache_key_parts, "|")
-- 尝试从缓存获取
local cached_result = cache_manager.get("product_search", cache_key)
if cached_result then
logger.debug("Search cache hit: ", cache_key)
return cached_result
end
-- 执行搜索查询
local search_result = product_service.execute_search(query, filters, page, per_page)
-- 缓存搜索结果(较短TTL,因为搜索结果变化较快)
cache_manager.set("product_search", cache_key, search_result, 300) -- 5分钟
return search_result
end
-- 预热热门产品缓存
function product_service.warm_up_cache()
logger.info("Starting cache warm-up...")
-- 获取热门产品列表
local hot_products = product_service.get_hot_products(100)
local warmed_count = 0
for _, product_id in ipairs(hot_products) do
local product = product_service.get_product_detail(product_id)
if product then
warmed_count = warmed_count + 1
end
end
logger.info("Cache warm-up completed, warmed ", warmed_count, " products")
end
return product_service
2.3 实时数据处理案例
2.3.1 实时统计系统
-- lua/app/services/analytics_service.lua - 实时分析服务
local analytics_service = {}
local redis = require "resty.redis"
local cjson = require "cjson"
local logger = require "app.utils.logger"
-- 时间窗口配置
local TIME_WINDOWS = {
minute = 60,
hour = 3600,
day = 86400
}
-- 记录页面访问
function analytics_service.track_page_view(page_url, user_id, session_id)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
logger.error("Redis connection failed: ", err)
return false
end
local timestamp = ngx.time()
local minute_key = math.floor(timestamp / 60) * 60
local hour_key = math.floor(timestamp / 3600) * 3600
local day_key = math.floor(timestamp / 86400) * 86400
-- 使用pipeline提高性能
red:init_pipeline()
-- 总访问量统计
red:incr("pv:total")
red:incr("pv:minute:" .. minute_key)
red:incr("pv:hour:" .. hour_key)
red:incr("pv:day:" .. day_key)
-- 页面访问统计
local page_hash = ngx.md5(page_url)
red:incr("pv:page:" .. page_hash .. ":total")
red:incr("pv:page:" .. page_hash .. ":minute:" .. minute_key)
red:incr("pv:page:" .. page_hash .. ":hour:" .. hour_key)
red:incr("pv:page:" .. page_hash .. ":day:" .. day_key)
-- 用户访问统计
if user_id then
red:sadd("uv:minute:" .. minute_key, user_id)
red:sadd("uv:hour:" .. hour_key, user_id)
red:sadd("uv:day:" .. day_key, user_id)
-- 设置过期时间
red:expire("uv:minute:" .. minute_key, 3600) -- 1小时后过期
red:expire("uv:hour:" .. hour_key, 86400 * 2) -- 2天后过期
red:expire("uv:day:" .. day_key, 86400 * 30) -- 30天后过期
end
-- 会话统计
if session_id then
red:sadd("sessions:minute:" .. minute_key, session_id)
red:sadd("sessions:hour:" .. hour_key, session_id)
red:sadd("sessions:day:" .. day_key, session_id)
red:expire("sessions:minute:" .. minute_key, 3600)
red:expire("sessions:hour:" .. hour_key, 86400 * 2)
red:expire("sessions:day:" .. day_key, 86400 * 30)
end
-- 实时热点页面统计(滑动窗口)
red:zincrby("hot_pages:" .. hour_key, 1, page_url)
red:expire("hot_pages:" .. hour_key, 86400)
local results, err = red:commit_pipeline()
red:set_keepalive(10000, 100)
if not results then
logger.error("Pipeline commit failed: ", err)
return false
end
return true
end
-- 记录用户行为事件
function analytics_service.track_event(event_type, event_data, user_id)
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
logger.error("Redis connection failed: ", err)
return false
end
local timestamp = ngx.time()
local event_record = {
type = event_type,
data = event_data,
user_id = user_id,
timestamp = timestamp,
ip = ngx.var.remote_addr,
user_agent = ngx.var.http_user_agent
}
-- 存储到事件流
local event_key = "events:" .. event_type .. ":" .. timestamp
red:setex(event_key, 86400 * 7, cjson.encode(event_record)) -- 保存7天
-- 更新事件计数
local hour_key = math.floor(timestamp / 3600) * 3600
red:incr("event_count:" .. event_type .. ":hour:" .. hour_key)
red:expire("event_count:" .. event_type .. ":hour:" .. hour_key, 86400 * 2)
-- 用户行为序列
if user_id then
local user_events_key = "user_events:" .. user_id
red:lpush(user_events_key, cjson.encode({
type = event_type,
timestamp = timestamp,
data = event_data
}))
red:ltrim(user_events_key, 0, 99) -- 只保留最近100个事件
red:expire(user_events_key, 86400 * 30) -- 30天过期
end
red:set_keepalive(10000, 100)
return true
end
-- 获取实时统计数据
function analytics_service.get_realtime_stats(time_window)
time_window = time_window or "hour"
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
logger.error("Redis connection failed: ", err)
return nil
end
local timestamp = ngx.time()
local window_size = TIME_WINDOWS[time_window] or TIME_WINDOWS.hour
local window_key = math.floor(timestamp / window_size) * window_size
red:init_pipeline()
-- 获取PV统计
red:get("pv:" .. time_window .. ":" .. window_key)
-- 获取UV统计
red:scard("uv:" .. time_window .. ":" .. window_key)
-- 获取会话统计
red:scard("sessions:" .. time_window .. ":" .. window_key)
-- 获取热点页面
red:zrevrange("hot_pages:" .. window_key, 0, 9, "withscores")
local results, err = red:commit_pipeline()
red:set_keepalive(10000, 100)
if not results then
logger.error("Pipeline commit failed: ", err)
return nil
end
-- 处理热点页面数据
local hot_pages = {}
local hot_pages_raw = results[4]
if hot_pages_raw then
for i = 1, #hot_pages_raw, 2 do
table.insert(hot_pages, {
url = hot_pages_raw[i],
views = tonumber(hot_pages_raw[i + 1]) or 0
})
end
end
local stats = {
time_window = time_window,
window_start = window_key,
page_views = tonumber(results[1]) or 0,
unique_visitors = tonumber(results[2]) or 0,
sessions = tonumber(results[3]) or 0,
hot_pages = hot_pages
}
return stats
end
-- 获取用户行为分析
function analytics_service.get_user_behavior(user_id, limit)
limit = limit or 50
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return nil, err
end
local user_events_key = "user_events:" .. user_id
local events_raw, err = red:lrange(user_events_key, 0, limit - 1)
red:set_keepalive(10000, 100)
if not events_raw then
return nil, err
end
local events = {}
for _, event_json in ipairs(events_raw) do
local success, event = pcall(cjson.decode, event_json)
if success then
table.insert(events, event)
end
end
return events
end
return analytics_service
3. 性能调优案例
3.1 高并发优化
3.1.1 Nginx配置优化
# 高性能nginx配置
user nginx;
worker_processes auto;
worker_cpu_affinity auto;
worker_rlimit_nofile 100000;
# 错误日志级别
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 4096;
use epoll;
multi_accept on;
accept_mutex off;
}
http {
# 基础配置
include /etc/nginx/mime.types;
default_type application/octet-stream;
# 性能优化
sendfile on;
tcp_nopush on;
tcp_nodelay on;
# 连接优化
keepalive_timeout 65;
keepalive_requests 1000;
# 缓冲区优化
client_body_buffer_size 128k;
client_max_body_size 10m;
client_header_buffer_size 32k;
large_client_header_buffers 4 32k;
# 输出缓冲
output_buffers 2 32k;
postpone_output 1460;
# 压缩配置
gzip on;
gzip_vary on;
gzip_min_length 1000;
gzip_comp_level 6;
gzip_proxied any;
gzip_types
text/plain
text/css
text/xml
text/javascript
application/json
application/javascript
application/xml+rss
application/atom+xml
image/svg+xml;
# 文件缓存
open_file_cache max=10000 inactive=60s;
open_file_cache_valid 30s;
open_file_cache_min_uses 2;
open_file_cache_errors on;
# Lua配置优化
lua_package_path '/opt/app/lua/?.lua;/opt/app/lua/?/init.lua;;';
lua_code_cache on;
lua_max_pending_timers 8192;
lua_max_running_timers 4096;
# 共享内存优化
lua_shared_dict cache 512m;
lua_shared_dict locks 10m;
lua_shared_dict rate_limit 50m;
lua_shared_dict sessions 100m;
lua_shared_dict metrics 20m;
lua_shared_dict upstream 10m;
# 日志格式优化
log_format main '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" '
'$request_time $upstream_response_time '
'$connection $connection_requests';
access_log /var/log/nginx/access.log main buffer=64k flush=5s;
}
3.1.2 Lua性能优化
-- lua/app/utils/performance.lua - 性能优化工具
local performance = {}
local ffi = require "ffi"
-- FFI声明(提高性能)
ffi.cdef[[
typedef struct {
long tv_sec;
long tv_usec;
} timeval;
int gettimeofday(timeval *tv, void *tz);
]]
-- 高精度时间测量
function performance.get_time()
local tv = ffi.new("timeval")
ffi.C.gettimeofday(tv, nil)
return tonumber(tv.tv_sec) + tonumber(tv.tv_usec) / 1000000
end
-- 性能监控装饰器
function performance.monitor(func_name, func)
return function(...)
local start_time = performance.get_time()
local results = {func(...)}
local duration = performance.get_time() - start_time
-- 记录性能指标
local prometheus = require "app.utils.prometheus"
prometheus.record_function_duration(func_name, duration)
-- 慢查询告警
if duration > 1.0 then -- 超过1秒
local logger = require "app.utils.logger"
logger.warn("Slow function detected: ", func_name, ", duration: ", duration)
end
return unpack(results)
end
end
-- 内存使用监控
function performance.monitor_memory()
local before = collectgarbage("count")
return function()
local after = collectgarbage("count")
local used = after - before
if used > 1024 then -- 超过1MB
local logger = require "app.utils.logger"
logger.warn("High memory usage detected: ", used, "KB")
end
return used
end
end
-- 对象池(减少GC压力)
local object_pools = {}
function performance.create_pool(name, factory, max_size)
max_size = max_size or 100
object_pools[name] = {
factory = factory,
pool = {},
max_size = max_size,
created = 0,
reused = 0
}
end
function performance.get_from_pool(name)
local pool_info = object_pools[name]
if not pool_info then
return nil
end
if #pool_info.pool > 0 then
pool_info.reused = pool_info.reused + 1
return table.remove(pool_info.pool)
else
pool_info.created = pool_info.created + 1
return pool_info.factory()
end
end
function performance.return_to_pool(name, obj)
local pool_info = object_pools[name]
if not pool_info or #pool_info.pool >= pool_info.max_size then
return false
end
-- 重置对象状态
if type(obj) == "table" and obj.reset then
obj:reset()
end
table.insert(pool_info.pool, obj)
return true
end
-- 批量操作优化
function performance.batch_execute(operations, batch_size)
batch_size = batch_size or 100
local results = {}
for i = 1, #operations, batch_size do
local batch = {}
for j = i, math.min(i + batch_size - 1, #operations) do
table.insert(batch, operations[j])
end
-- 执行批次
local batch_results = performance.execute_batch(batch)
for _, result in ipairs(batch_results) do
table.insert(results, result)
end
-- 让出CPU时间
if i + batch_size <= #operations then
ngx.sleep(0.001) -- 1ms
end
end
return results
end
return performance
3.2 内存优化案例
-- lua/app/utils/memory_optimizer.lua - 内存优化
local memory_optimizer = {}
local logger = require "app.utils.logger"
-- 内存监控配置
local MEMORY_THRESHOLDS = {
warning = 100 * 1024, -- 100MB
critical = 200 * 1024 -- 200MB
}
-- 监控内存使用
function memory_optimizer.monitor_memory_usage()
local memory_kb = collectgarbage("count")
if memory_kb > MEMORY_THRESHOLDS.critical then
logger.error("Critical memory usage: ", memory_kb, "KB")
memory_optimizer.emergency_cleanup()
elseif memory_kb > MEMORY_THRESHOLDS.warning then
logger.warn("High memory usage: ", memory_kb, "KB")
memory_optimizer.gentle_cleanup()
end
return memory_kb
end
-- 温和清理
function memory_optimizer.gentle_cleanup()
-- 清理过期缓存
memory_optimizer.cleanup_expired_cache()
-- 执行增量GC
collectgarbage("step", 200)
end
-- 紧急清理
function memory_optimizer.emergency_cleanup()
logger.info("Starting emergency memory cleanup...")
-- 清理所有可清理的缓存
memory_optimizer.cleanup_all_cache()
-- 强制完整GC
collectgarbage("collect")
local after_cleanup = collectgarbage("count")
logger.info("Emergency cleanup completed, memory: ", after_cleanup, "KB")
end
-- 清理过期缓存
function memory_optimizer.cleanup_expired_cache()
local cache_dict = ngx.shared.cache
if not cache_dict then
return
end
-- 获取所有键
local keys = cache_dict:get_keys(1000)
local cleaned = 0
for _, key in ipairs(keys) do
local value, flags = cache_dict:get(key)
if not value then -- 已过期
cache_dict:delete(key)
cleaned = cleaned + 1
end
end
logger.debug("Cleaned ", cleaned, " expired cache entries")
end
-- 清理所有缓存
function memory_optimizer.cleanup_all_cache()
local shared_dicts = {"cache", "sessions", "rate_limit"}
for _, dict_name in ipairs(shared_dicts) do
local dict = ngx.shared[dict_name]
if dict then
dict:flush_all()
logger.debug("Flushed shared dict: ", dict_name)
end
end
end
-- 内存使用报告
function memory_optimizer.get_memory_report()
local report = {
lua_memory_kb = collectgarbage("count"),
shared_dicts = {}
}
local shared_dicts = {"cache", "sessions", "rate_limit", "metrics"}
for _, dict_name in ipairs(shared_dicts) do
local dict = ngx.shared[dict_name]
if dict then
report.shared_dicts[dict_name] = {
capacity = dict:capacity(),
free_space = dict:free_space(),
used_space = dict:capacity() - dict:free_space()
}
end
end
return report
end
return memory_optimizer
4. 故障排查与调试
4.1 常见问题诊断
4.1.1 性能问题诊断
-- lua/app/utils/diagnostics.lua - 诊断工具
local diagnostics = {}
local cjson = require "cjson"
local logger = require "app.utils.logger"
-- 性能诊断
function diagnostics.performance_check()
local report = {
timestamp = ngx.time(),
checks = {}
}
-- 1. 内存使用检查
local memory_kb = collectgarbage("count")
report.checks.memory = {
status = memory_kb < 100 * 1024 and "ok" or "warning",
value = memory_kb,
unit = "KB",
threshold = 100 * 1024
}
-- 2. 共享内存检查
local shared_memory = {}
local shared_dicts = {"cache", "sessions", "rate_limit"}
for _, dict_name in ipairs(shared_dicts) do
local dict = ngx.shared[dict_name]
if dict then
local usage_percent = (dict:capacity() - dict:free_space()) / dict:capacity() * 100
shared_memory[dict_name] = {
status = usage_percent < 80 and "ok" or "warning",
usage_percent = usage_percent,
capacity = dict:capacity(),
free_space = dict:free_space()
}
end
end
report.checks.shared_memory = shared_memory
-- 3. 连接池状态检查
report.checks.connection_pools = diagnostics.check_connection_pools()
-- 4. 缓存命中率检查
report.checks.cache_hit_rate = diagnostics.check_cache_hit_rate()
return report
end
-- 连接池检查
function diagnostics.check_connection_pools()
-- 这里需要根据实际的连接池实现来检查
return {
mysql = {
status = "ok",
active_connections = 5,
idle_connections = 15,
max_connections = 100
},
redis = {
status = "ok",
active_connections = 3,
idle_connections = 27,
max_connections = 50
}
}
end
-- 缓存命中率检查
function diagnostics.check_cache_hit_rate()
local metrics_dict = ngx.shared.metrics
if not metrics_dict then
return {status = "unknown", message = "Metrics not available"}
end
local hits = metrics_dict:get("cache_hits") or 0
local misses = metrics_dict:get("cache_misses") or 0
local total = hits + misses
if total == 0 then
return {status = "unknown", message = "No cache operations recorded"}
end
local hit_rate = hits / total * 100
return {
status = hit_rate > 80 and "ok" or "warning",
hit_rate = hit_rate,
hits = hits,
misses = misses,
total = total
}
end
-- 慢查询检测
function diagnostics.detect_slow_queries(threshold)
threshold = threshold or 1.0 -- 1秒
local slow_queries = {}
-- 从日志或监控数据中获取慢查询
-- 这里是示例实现
local queries = {
{sql = "SELECT * FROM users WHERE email = ?", duration = 1.5},
{sql = "SELECT * FROM products WHERE category_id = ?", duration = 0.8},
{sql = "SELECT COUNT(*) FROM orders WHERE created_at > ?", duration = 2.1}
}
for _, query in ipairs(queries) do
if query.duration > threshold then
table.insert(slow_queries, query)
end
end
return slow_queries
end
-- 错误统计
function diagnostics.get_error_stats(time_window)
time_window = time_window or 3600 -- 1小时
local redis = require "resty.redis"
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return nil, "Redis connection failed: " .. err
end
local current_time = ngx.time()
local window_start = current_time - time_window
-- 获取错误统计
local error_types = {"4xx", "5xx", "timeout", "connection_error"}
local error_stats = {}
for _, error_type in ipairs(error_types) do
local count = red:get("errors:" .. error_type .. ":" .. math.floor(current_time / 3600) * 3600) or 0
error_stats[error_type] = tonumber(count)
end
red:set_keepalive(10000, 100)
return error_stats
end
return diagnostics
4.2 调试工具
-- lua/app/utils/debugger.lua - 调试工具
local debugger = {}
local cjson = require "cjson"
local logger = require "app.utils.logger"
-- 调试模式开关
local DEBUG_MODE = os.getenv("DEBUG_MODE") == "true"
-- 调试日志
function debugger.debug_log(message, data)
if not DEBUG_MODE then
return
end
local debug_info = {
timestamp = ngx.time(),
message = message,
data = data,
request_id = ngx.var.request_id,
uri = ngx.var.uri,
method = ngx.var.request_method
}
logger.debug("[DEBUG] ", cjson.encode(debug_info))
end
-- 函数执行跟踪
function debugger.trace_function(func_name, func)
if not DEBUG_MODE then
return func
end
return function(...)
local args = {...}
debugger.debug_log("Function called: " .. func_name, {args = args})
local start_time = ngx.now()
local results = {func(...)}
local duration = ngx.now() - start_time
debugger.debug_log("Function completed: " .. func_name, {
duration = duration,
results = results
})
return unpack(results)
end
end
-- 变量监控
function debugger.watch_variable(name, value)
if not DEBUG_MODE then
return
end
local watch_key = "debug:watch:" .. name
local shared_dict = ngx.shared.cache
if shared_dict then
local old_value = shared_dict:get(watch_key)
if old_value ~= cjson.encode(value) then
debugger.debug_log("Variable changed: " .. name, {
old_value = old_value,
new_value = value
})
shared_dict:set(watch_key, cjson.encode(value))
end
end
end
-- 请求跟踪
function debugger.trace_request()
if not DEBUG_MODE then
return
end
local trace_data = {
request_id = ngx.var.request_id,
method = ngx.var.request_method,
uri = ngx.var.uri,
args = ngx.var.args,
headers = ngx.req.get_headers(),
start_time = ngx.now(),
remote_addr = ngx.var.remote_addr,
user_agent = ngx.var.http_user_agent
}
ngx.ctx.trace_data = trace_data
debugger.debug_log("Request started", trace_data)
end
-- 响应跟踪
function debugger.trace_response()
if not DEBUG_MODE or not ngx.ctx.trace_data then
return
end
local trace_data = ngx.ctx.trace_data
trace_data.end_time = ngx.now()
trace_data.duration = trace_data.end_time - trace_data.start_time
trace_data.status = ngx.status
trace_data.response_headers = ngx.resp.get_headers()
debugger.debug_log("Request completed", trace_data)
end
-- 性能分析
function debugger.profile_block(name, block_func)
if not DEBUG_MODE then
return block_func()
end
local start_time = ngx.now()
local start_memory = collectgarbage("count")
local results = {block_func()}
local end_time = ngx.now()
local end_memory = collectgarbage("count")
debugger.debug_log("Block profiled: " .. name, {
duration = end_time - start_time,
memory_used = end_memory - start_memory,
memory_unit = "KB"
})
return unpack(results)
end
return debugger
5. 最佳实践总结
5.1 开发规范
代码组织
- 采用模块化设计,职责分离
- 统一的错误处理机制
- 完善的日志记录
- 合理的配置管理
性能优化
- 合理使用连接池
- 多级缓存策略
- 批量操作优化
- 内存使用监控
安全实践
- 输入验证和过滤
- 认证和授权机制
- 限流和防护
- 安全配置管理
5.2 运维规范
监控告警
- 完善的监控指标
- 及时的告警机制
- 健康检查端点
- 性能基线建立
部署管理
- 自动化部署流程
- 灰度发布策略
- 回滚机制
- 配置热更新
故障处理
- 快速诊断工具
- 详细的调试信息
- 故障恢复流程
- 事后分析总结
通过本章的学习,你应该掌握了OpenResty在实际项目中的最佳实践,包括代码组织、性能优化、故障排查等各个方面。这些实践经验将帮助你构建高质量、高性能的OpenResty应用。