1. 数据库支持概览

1.1 支持的数据库

OpenResty通过各种lua-resty库支持多种数据库: - MySQL: lua-resty-mysql - PostgreSQL: lua-resty-postgres - Redis: lua-resty-redis - MongoDB: lua-resty-mongol - InfluxDB: lua-resty-influxdb - Elasticsearch: lua-resty-elasticsearch

1.2 连接池机制

所有数据库连接都支持连接池,这是OpenResty高性能的关键特性: - 连接复用 - 自动管理连接生命周期 - 支持连接超时和重试 - 负载均衡和故障转移

2. MySQL数据库操作

2.1 基础配置和连接

local mysql = require "resty.mysql"

-- 创建MySQL连接
local function connect_mysql()
    local db, err = mysql:new()
    if not db then
        ngx.log(ngx.ERR, "failed to instantiate mysql: ", err)
        return nil, err
    end
    
    -- 设置超时时间
    db:set_timeout(5000)  -- 5秒
    
    -- 连接数据库
    local ok, err, errcode, sqlstate = db:connect({
        host = "127.0.0.1",
        port = 3306,
        database = "test_db",
        user = "root",
        password = "password",
        charset = "utf8mb4",
        max_packet_size = 1024 * 1024,  -- 1MB
        pool = "mysql_pool",            -- 连接池名称
        pool_size = 100,                -- 连接池大小
        backlog = 50                    -- 等待队列大小
    })
    
    if not ok then
        ngx.log(ngx.ERR, "failed to connect to mysql: ", err, ": ", errcode, " ", sqlstate)
        return nil, err
    end
    
    return db
end

-- 释放连接到池
local function close_mysql(db)
    if not db then
        return
    end
    
    -- 将连接放回连接池
    local ok, err = db:set_keepalive(30000, 100)  -- 30秒超时,最多100个连接
    if not ok then
        ngx.log(ngx.ERR, "failed to set keepalive: ", err)
        db:close()
    end
end

2.2 基本CRUD操作

local mysql = require "resty.mysql"
local cjson = require "cjson"

-- 用户管理模块
local _M = {}

-- 创建用户
function _M.create_user(user_data)
    local db, err = connect_mysql()
    if not db then
        return nil, err
    end
    
    -- 准备SQL语句
    local sql = "INSERT INTO users (name, email, age, created_at) VALUES (?, ?, ?, NOW())"
    
    -- 执行插入
    local res, err, errcode, sqlstate = db:query(sql, user_data.name, user_data.email, user_data.age)
    
    close_mysql(db)
    
    if not res then
        ngx.log(ngx.ERR, "failed to insert user: ", err, ": ", errcode, " ", sqlstate)
        return nil, err
    end
    
    return res.insert_id
end

-- 获取用户
function _M.get_user(user_id)
    local db, err = connect_mysql()
    if not db then
        return nil, err
    end
    
    local sql = "SELECT id, name, email, age, created_at FROM users WHERE id = ?"
    local res, err, errcode, sqlstate = db:query(sql, user_id)
    
    close_mysql(db)
    
    if not res then
        ngx.log(ngx.ERR, "failed to query user: ", err, ": ", errcode, " ", sqlstate)
        return nil, err
    end
    
    if #res == 0 then
        return nil, "User not found"
    end
    
    return res[1]
end

-- 更新用户
function _M.update_user(user_id, user_data)
    local db, err = connect_mysql()
    if not db then
        return nil, err
    end
    
    local sql = "UPDATE users SET name = ?, email = ?, age = ?, updated_at = NOW() WHERE id = ?"
    local res, err, errcode, sqlstate = db:query(sql, user_data.name, user_data.email, user_data.age, user_id)
    
    close_mysql(db)
    
    if not res then
        ngx.log(ngx.ERR, "failed to update user: ", err, ": ", errcode, " ", sqlstate)
        return nil, err
    end
    
    return res.affected_rows > 0
end

-- 删除用户
function _M.delete_user(user_id)
    local db, err = connect_mysql()
    if not db then
        return nil, err
    end
    
    local sql = "DELETE FROM users WHERE id = ?"
    local res, err, errcode, sqlstate = db:query(sql, user_id)
    
    close_mysql(db)
    
    if not res then
        ngx.log(ngx.ERR, "failed to delete user: ", err, ": ", errcode, " ", sqlstate)
        return nil, err
    end
    
    return res.affected_rows > 0
end

-- 获取用户列表
function _M.get_users(page, limit)
    page = page or 1
    limit = limit or 10
    local offset = (page - 1) * limit
    
    local db, err = connect_mysql()
    if not db then
        return nil, err
    end
    
    -- 获取总数
    local count_sql = "SELECT COUNT(*) as total FROM users"
    local count_res, err = db:query(count_sql)
    if not count_res then
        close_mysql(db)
        return nil, err
    end
    
    local total = count_res[1].total
    
    -- 获取分页数据
    local sql = "SELECT id, name, email, age, created_at FROM users ORDER BY id DESC LIMIT ? OFFSET ?"
    local res, err, errcode, sqlstate = db:query(sql, limit, offset)
    
    close_mysql(db)
    
    if not res then
        ngx.log(ngx.ERR, "failed to query users: ", err, ": ", errcode, " ", sqlstate)
        return nil, err
    end
    
    return {
        users = res,
        total = total,
        page = page,
        limit = limit,
        pages = math.ceil(total / limit)
    }
end

return _M

2.3 事务处理

local mysql = require "resty.mysql"

-- 转账事务示例
local function transfer_money(from_account, to_account, amount)
    local db, err = connect_mysql()
    if not db then
        return nil, err
    end
    
    -- 开始事务
    local res, err = db:query("START TRANSACTION")
    if not res then
        close_mysql(db)
        return nil, "Failed to start transaction: " .. err
    end
    
    local function rollback()
        db:query("ROLLBACK")
        close_mysql(db)
    end
    
    -- 检查源账户余额
    local check_sql = "SELECT balance FROM accounts WHERE id = ? FOR UPDATE"
    local balance_res, err = db:query(check_sql, from_account)
    if not balance_res or #balance_res == 0 then
        rollback()
        return nil, "Source account not found"
    end
    
    local current_balance = balance_res[1].balance
    if current_balance < amount then
        rollback()
        return nil, "Insufficient balance"
    end
    
    -- 扣除源账户金额
    local debit_sql = "UPDATE accounts SET balance = balance - ? WHERE id = ?"
    local debit_res, err = db:query(debit_sql, amount, from_account)
    if not debit_res or debit_res.affected_rows == 0 then
        rollback()
        return nil, "Failed to debit source account"
    end
    
    -- 增加目标账户金额
    local credit_sql = "UPDATE accounts SET balance = balance + ? WHERE id = ?"
    local credit_res, err = db:query(credit_sql, amount, to_account)
    if not credit_res or credit_res.affected_rows == 0 then
        rollback()
        return nil, "Failed to credit target account"
    end
    
    -- 记录转账日志
    local log_sql = "INSERT INTO transfer_logs (from_account, to_account, amount, created_at) VALUES (?, ?, ?, NOW())"
    local log_res, err = db:query(log_sql, from_account, to_account, amount)
    if not log_res then
        rollback()
        return nil, "Failed to log transfer"
    end
    
    -- 提交事务
    local commit_res, err = db:query("COMMIT")
    if not commit_res then
        rollback()
        return nil, "Failed to commit transaction: " .. err
    end
    
    close_mysql(db)
    return true
end

-- 使用事务
local success, err = transfer_money(1, 2, 100.00)
if success then
    ngx.say("Transfer completed successfully")
else
    ngx.log(ngx.ERR, "Transfer failed: ", err)
    ngx.status = 400
    ngx.say("Transfer failed: ", err)
end

2.4 批量操作

-- 批量插入用户
local function batch_insert_users(users)
    local db, err = connect_mysql()
    if not db then
        return nil, err
    end
    
    -- 构建批量插入SQL
    local values = {}
    local params = {}
    
    for i, user in ipairs(users) do
        table.insert(values, "(?, ?, ?, NOW())")
        table.insert(params, user.name)
        table.insert(params, user.email)
        table.insert(params, user.age)
    end
    
    local sql = "INSERT INTO users (name, email, age, created_at) VALUES " .. table.concat(values, ", ")
    
    local res, err, errcode, sqlstate = db:query(sql, unpack(params))
    
    close_mysql(db)
    
    if not res then
        ngx.log(ngx.ERR, "failed to batch insert users: ", err, ": ", errcode, " ", sqlstate)
        return nil, err
    end
    
    return res.affected_rows
end

-- 使用示例
local users_data = {
    {name = "Alice", email = "alice@example.com", age = 25},
    {name = "Bob", email = "bob@example.com", age = 30},
    {name = "Charlie", email = "charlie@example.com", age = 35}
}

local affected_rows, err = batch_insert_users(users_data)
if affected_rows then
    ngx.say("Inserted ", affected_rows, " users")
else
    ngx.say("Batch insert failed: ", err)
end

3. Redis操作

3.1 基础连接和操作

local redis = require "resty.redis"

-- 创建Redis连接
local function connect_redis()
    local red = redis:new()
    
    red:set_timeout(5000)  -- 5秒超时
    
    local ok, err = red:connect("127.0.0.1", 6379, {
        pool = "redis_pool",
        pool_size = 100,
        backlog = 50
    })
    
    if not ok then
        ngx.log(ngx.ERR, "failed to connect to redis: ", err)
        return nil, err
    end
    
    -- 认证(如果需要)
    -- local res, err = red:auth("password")
    -- if not res then
    --     return nil, err
    -- end
    
    -- 选择数据库
    local res, err = red:select(0)
    if not res then
        return nil, err
    end
    
    return red
end

-- 释放Redis连接
local function close_redis(red)
    if not red then
        return
    end
    
    local ok, err = red:set_keepalive(30000, 100)
    if not ok then
        ngx.log(ngx.ERR, "failed to set keepalive: ", err)
        red:close()
    end
end

3.2 缓存操作

local redis = require "resty.redis"
local cjson = require "cjson"

-- 缓存管理模块
local cache = {}

-- 设置缓存
function cache.set(key, value, ttl)
    local red, err = connect_redis()
    if not red then
        return false, err
    end
    
    -- 序列化值
    local serialized_value
    if type(value) == "table" then
        serialized_value = cjson.encode(value)
    else
        serialized_value = tostring(value)
    end
    
    local ok, err
    if ttl then
        ok, err = red:setex(key, ttl, serialized_value)
    else
        ok, err = red:set(key, serialized_value)
    end
    
    close_redis(red)
    
    if not ok then
        ngx.log(ngx.ERR, "failed to set cache: ", err)
        return false, err
    end
    
    return true
end

-- 获取缓存
function cache.get(key)
    local red, err = connect_redis()
    if not red then
        return nil, err
    end
    
    local value, err = red:get(key)
    
    close_redis(red)
    
    if not value or value == ngx.null then
        return nil, "not found"
    end
    
    if err then
        ngx.log(ngx.ERR, "failed to get cache: ", err)
        return nil, err
    end
    
    -- 尝试反序列化JSON
    local ok, decoded = pcall(cjson.decode, value)
    if ok then
        return decoded
    else
        return value
    end
end

-- 删除缓存
function cache.delete(key)
    local red, err = connect_redis()
    if not red then
        return false, err
    end
    
    local ok, err = red:del(key)
    
    close_redis(red)
    
    if not ok then
        ngx.log(ngx.ERR, "failed to delete cache: ", err)
        return false, err
    end
    
    return ok > 0
end

-- 批量操作
function cache.mget(keys)
    local red, err = connect_redis()
    if not red then
        return nil, err
    end
    
    local values, err = red:mget(unpack(keys))
    
    close_redis(red)
    
    if not values then
        ngx.log(ngx.ERR, "failed to mget cache: ", err)
        return nil, err
    end
    
    local result = {}
    for i, value in ipairs(values) do
        if value ~= ngx.null then
            local ok, decoded = pcall(cjson.decode, value)
            result[keys[i]] = ok and decoded or value
        end
    end
    
    return result
end

-- 使用示例
local user_data = {
    id = 123,
    name = "John Doe",
    email = "john@example.com"
}

-- 设置缓存,5分钟过期
cache.set("user:123", user_data, 300)

-- 获取缓存
local cached_user = cache.get("user:123")
if cached_user then
    ngx.say("Cached user: ", cjson.encode(cached_user))
else
    ngx.say("User not found in cache")
end

3.3 分布式锁

local redis = require "resty.redis"

-- 分布式锁实现
local lock = {}

-- 获取锁
function lock.acquire(key, timeout, retry_times)
    timeout = timeout or 10  -- 默认10秒超时
    retry_times = retry_times or 3
    
    local lock_key = "lock:" .. key
    local lock_value = ngx.time() .. ":" .. ngx.worker.pid()
    
    for i = 1, retry_times do
        local red, err = connect_redis()
        if not red then
            return false, err
        end
        
        -- 使用SET NX EX原子操作
        local ok, err = red:set(lock_key, lock_value, "NX", "EX", timeout)
        
        close_redis(red)
        
        if ok and ok ~= ngx.null then
            return true, lock_value
        end
        
        if i < retry_times then
            ngx.sleep(0.1)  -- 等待100ms后重试
        end
    end
    
    return false, "Failed to acquire lock after " .. retry_times .. " attempts"
end

-- 释放锁
function lock.release(key, lock_value)
    local red, err = connect_redis()
    if not red then
        return false, err
    end
    
    local lock_key = "lock:" .. key
    
    -- Lua脚本确保原子性
    local script = [[
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    ]]
    
    local ok, err = red:eval(script, 1, lock_key, lock_value)
    
    close_redis(red)
    
    if not ok then
        ngx.log(ngx.ERR, "failed to release lock: ", err)
        return false, err
    end
    
    return ok == 1
end

-- 使用分布式锁
local function critical_section()
    local lock_acquired, lock_value = lock.acquire("critical_resource", 30)
    
    if not lock_acquired then
        ngx.log(ngx.WARN, "Failed to acquire lock: ", lock_value)
        return false, "Resource is busy"
    end
    
    -- 执行临界区代码
    ngx.log(ngx.INFO, "Executing critical section")
    
    -- 模拟一些工作
    ngx.sleep(1)
    
    -- 释放锁
    local released = lock.release("critical_resource", lock_value)
    if not released then
        ngx.log(ngx.WARN, "Failed to release lock")
    end
    
    return true, "Operation completed"
end

-- 调用示例
local success, message = critical_section()
ngx.say(success and "Success: " or "Error: ", message)

3.4 发布订阅

local redis = require "resty.redis"
local cjson = require "cjson"

-- 发布消息
local function publish_message(channel, message)
    local red, err = connect_redis()
    if not red then
        return false, err
    end
    
    local serialized_message = cjson.encode({
        timestamp = ngx.time(),
        data = message,
        source = "openresty"
    })
    
    local subscribers, err = red:publish(channel, serialized_message)
    
    close_redis(red)
    
    if not subscribers then
        ngx.log(ngx.ERR, "failed to publish message: ", err)
        return false, err
    end
    
    ngx.log(ngx.INFO, "Message published to ", subscribers, " subscribers")
    return true, subscribers
end

-- 订阅消息(在init_worker_by_lua中使用)
local function subscribe_messages(channels, callback)
    local red, err = connect_redis()
    if not red then
        ngx.log(ngx.ERR, "failed to connect for subscription: ", err)
        return
    end
    
    -- 订阅频道
    local ok, err = red:subscribe(unpack(channels))
    if not ok then
        ngx.log(ngx.ERR, "failed to subscribe: ", err)
        return
    end
    
    -- 处理消息
    while true do
        local res, err = red:read_reply()
        if not res then
            ngx.log(ngx.ERR, "failed to read reply: ", err)
            break
        end
        
        if res[1] == "message" then
            local channel = res[2]
            local message = res[3]
            
            -- 解析消息
            local ok, parsed_message = pcall(cjson.decode, message)
            if ok then
                callback(channel, parsed_message)
            else
                callback(channel, {data = message})
            end
        end
    end
    
    red:close()
end

-- 消息处理回调
local function message_handler(channel, message)
    ngx.log(ngx.INFO, "Received message on ", channel, ": ", cjson.encode(message))
    
    -- 根据频道处理不同类型的消息
    if channel == "user_events" then
        -- 处理用户事件
        if message.data.action == "login" then
            ngx.log(ngx.INFO, "User ", message.data.user_id, " logged in")
        end
    elseif channel == "system_alerts" then
        -- 处理系统告警
        ngx.log(ngx.WARN, "System alert: ", message.data.alert)
    end
end

-- 在init_worker_by_lua中启动订阅
-- subscribe_messages({"user_events", "system_alerts"}, message_handler)

-- 发布消息示例
local success, subscribers = publish_message("user_events", {
    action = "login",
    user_id = 123,
    ip = ngx.var.remote_addr
})

if success then
    ngx.say("Message published to ", subscribers, " subscribers")
else
    ngx.say("Failed to publish message")
end

4. PostgreSQL操作

4.1 基础连接

local postgres = require "resty.postgres"

-- 创建PostgreSQL连接
local function connect_postgres()
    local pg = postgres:new()
    
    pg:set_timeout(5000)
    
    local ok, err = pg:connect({
        host = "127.0.0.1",
        port = 5432,
        database = "test_db",
        user = "postgres",
        password = "password",
        pool = "postgres_pool",
        pool_size = 50,
        backlog = 30
    })
    
    if not ok then
        ngx.log(ngx.ERR, "failed to connect to postgres: ", err)
        return nil, err
    end
    
    return pg
end

-- 释放连接
local function close_postgres(pg)
    if not pg then
        return
    end
    
    local ok, err = pg:set_keepalive(30000, 50)
    if not ok then
        ngx.log(ngx.ERR, "failed to set keepalive: ", err)
        pg:close()
    end
end

4.2 基本操作

-- PostgreSQL用户管理
local pg_users = {}

-- 创建用户
function pg_users.create(user_data)
    local pg, err = connect_postgres()
    if not pg then
        return nil, err
    end
    
    local sql = [[
        INSERT INTO users (name, email, age, created_at) 
        VALUES ($1, $2, $3, NOW()) 
        RETURNING id
    ]]
    
    local res, err = pg:query(sql, user_data.name, user_data.email, user_data.age)
    
    close_postgres(pg)
    
    if not res then
        ngx.log(ngx.ERR, "failed to create user: ", err)
        return nil, err
    end
    
    return res[1].id
end

-- 获取用户
function pg_users.get(user_id)
    local pg, err = connect_postgres()
    if not pg then
        return nil, err
    end
    
    local sql = "SELECT id, name, email, age, created_at FROM users WHERE id = $1"
    local res, err = pg:query(sql, user_id)
    
    close_postgres(pg)
    
    if not res then
        ngx.log(ngx.ERR, "failed to get user: ", err)
        return nil, err
    end
    
    if #res == 0 then
        return nil, "User not found"
    end
    
    return res[1]
end

-- JSON字段操作
function pg_users.update_profile(user_id, profile_data)
    local pg, err = connect_postgres()
    if not pg then
        return nil, err
    end
    
    local sql = [[
        UPDATE users 
        SET profile = profile || $2::jsonb, updated_at = NOW() 
        WHERE id = $1
        RETURNING profile
    ]]
    
    local profile_json = require("cjson").encode(profile_data)
    local res, err = pg:query(sql, user_id, profile_json)
    
    close_postgres(pg)
    
    if not res then
        ngx.log(ngx.ERR, "failed to update profile: ", err)
        return nil, err
    end
    
    return res[1] and res[1].profile
end

return pg_users

5. 数据库连接池管理

5.1 连接池配置

-- 数据库连接管理器
local db_manager = {}

-- 数据库配置
local db_configs = {
    mysql = {
        host = "127.0.0.1",
        port = 3306,
        database = "app_db",
        user = "app_user",
        password = "app_password",
        charset = "utf8mb4",
        pool = "mysql_pool",
        pool_size = 100,
        backlog = 50,
        timeout = 5000,
        keepalive_timeout = 30000,
        keepalive_pool = 50
    },
    redis = {
        host = "127.0.0.1",
        port = 6379,
        database = 0,
        pool = "redis_pool",
        pool_size = 100,
        backlog = 50,
        timeout = 5000,
        keepalive_timeout = 30000,
        keepalive_pool = 50
    }
}

-- 获取MySQL连接
function db_manager.get_mysql()
    local mysql = require "resty.mysql"
    local config = db_configs.mysql
    
    local db, err = mysql:new()
    if not db then
        return nil, err
    end
    
    db:set_timeout(config.timeout)
    
    local ok, err, errcode, sqlstate = db:connect({
        host = config.host,
        port = config.port,
        database = config.database,
        user = config.user,
        password = config.password,
        charset = config.charset,
        pool = config.pool,
        pool_size = config.pool_size,
        backlog = config.backlog
    })
    
    if not ok then
        return nil, err
    end
    
    return db
end

-- 释放MySQL连接
function db_manager.close_mysql(db)
    if not db then
        return
    end
    
    local config = db_configs.mysql
    local ok, err = db:set_keepalive(config.keepalive_timeout, config.keepalive_pool)
    if not ok then
        ngx.log(ngx.ERR, "failed to set mysql keepalive: ", err)
        db:close()
    end
end

-- 获取Redis连接
function db_manager.get_redis()
    local redis = require "resty.redis"
    local config = db_configs.redis
    
    local red = redis:new()
    red:set_timeout(config.timeout)
    
    local ok, err = red:connect(config.host, config.port, {
        pool = config.pool,
        pool_size = config.pool_size,
        backlog = config.backlog
    })
    
    if not ok then
        return nil, err
    end
    
    if config.database ~= 0 then
        local res, err = red:select(config.database)
        if not res then
            return nil, err
        end
    end
    
    return red
end

-- 释放Redis连接
function db_manager.close_redis(red)
    if not red then
        return
    end
    
    local config = db_configs.redis
    local ok, err = red:set_keepalive(config.keepalive_timeout, config.keepalive_pool)
    if not ok then
        ngx.log(ngx.ERR, "failed to set redis keepalive: ", err)
        red:close()
    end
end

-- 健康检查
function db_manager.health_check()
    local health = {
        mysql = false,
        redis = false
    }
    
    -- 检查MySQL
    local db, err = db_manager.get_mysql()
    if db then
        local res, err = db:query("SELECT 1")
        if res then
            health.mysql = true
        end
        db_manager.close_mysql(db)
    end
    
    -- 检查Redis
    local red, err = db_manager.get_redis()
    if red then
        local res, err = red:ping()
        if res == "PONG" then
            health.redis = true
        end
        db_manager.close_redis(red)
    end
    
    return health
end

return db_manager

5.2 连接池监控

-- 连接池监控
local function get_pool_stats()
    local stats = {}
    
    -- 获取共享内存中的统计信息
    local cache = ngx.shared.my_stats
    if cache then
        local keys = cache:get_keys(0)
        for _, key in ipairs(keys) do
            if string.match(key, "^pool_") then
                stats[key] = cache:get(key)
            end
        end
    end
    
    return stats
end

-- 记录连接池使用情况
local function record_pool_usage(pool_name, action)
    local cache = ngx.shared.my_stats
    if not cache then
        return
    end
    
    local key = "pool_" .. pool_name .. "_" .. action
    cache:incr(key, 1, 0, 3600)  -- 1小时过期
end

-- 在连接获取和释放时记录统计
local original_get_mysql = db_manager.get_mysql
function db_manager.get_mysql()
    record_pool_usage("mysql", "get")
    return original_get_mysql()
end

local original_close_mysql = db_manager.close_mysql
function db_manager.close_mysql(db)
    record_pool_usage("mysql", "close")
    return original_close_mysql(db)
end

-- 监控接口
location /db-stats {
    content_by_lua_block {
        local stats = get_pool_stats()
        local health = db_manager.health_check()
        
        local response = {
            health = health,
            pool_stats = stats,
            timestamp = ngx.time()
        }
        
        ngx.header.content_type = "application/json"
        ngx.say(require("cjson").encode(response))
    }
}

6. 总结

OpenResty提供了丰富的数据库连接库,支持MySQL、PostgreSQL、Redis等主流数据库。通过合理使用连接池、事务处理、错误处理等机制,我们可以构建高性能、高可用的数据库应用。掌握这些数据库操作技能是开发现代Web应用的重要基础。

7. 参考资料