1. 数据库支持概览
1.1 支持的数据库
OpenResty通过各种lua-resty库支持多种数据库: - MySQL: lua-resty-mysql - PostgreSQL: lua-resty-postgres - Redis: lua-resty-redis - MongoDB: lua-resty-mongol - InfluxDB: lua-resty-influxdb - Elasticsearch: lua-resty-elasticsearch
1.2 连接池机制
所有数据库连接都支持连接池,这是OpenResty高性能的关键特性: - 连接复用 - 自动管理连接生命周期 - 支持连接超时和重试 - 负载均衡和故障转移
2. MySQL数据库操作
2.1 基础配置和连接
local mysql = require "resty.mysql"
-- 创建MySQL连接
local function connect_mysql()
local db, err = mysql:new()
if not db then
ngx.log(ngx.ERR, "failed to instantiate mysql: ", err)
return nil, err
end
-- 设置超时时间
db:set_timeout(5000) -- 5秒
-- 连接数据库
local ok, err, errcode, sqlstate = db:connect({
host = "127.0.0.1",
port = 3306,
database = "test_db",
user = "root",
password = "password",
charset = "utf8mb4",
max_packet_size = 1024 * 1024, -- 1MB
pool = "mysql_pool", -- 连接池名称
pool_size = 100, -- 连接池大小
backlog = 50 -- 等待队列大小
})
if not ok then
ngx.log(ngx.ERR, "failed to connect to mysql: ", err, ": ", errcode, " ", sqlstate)
return nil, err
end
return db
end
-- 释放连接到池
local function close_mysql(db)
if not db then
return
end
-- 将连接放回连接池
local ok, err = db:set_keepalive(30000, 100) -- 30秒超时,最多100个连接
if not ok then
ngx.log(ngx.ERR, "failed to set keepalive: ", err)
db:close()
end
end
2.2 基本CRUD操作
local mysql = require "resty.mysql"
local cjson = require "cjson"
-- 用户管理模块
local _M = {}
-- 创建用户
function _M.create_user(user_data)
local db, err = connect_mysql()
if not db then
return nil, err
end
-- 准备SQL语句
local sql = "INSERT INTO users (name, email, age, created_at) VALUES (?, ?, ?, NOW())"
-- 执行插入
local res, err, errcode, sqlstate = db:query(sql, user_data.name, user_data.email, user_data.age)
close_mysql(db)
if not res then
ngx.log(ngx.ERR, "failed to insert user: ", err, ": ", errcode, " ", sqlstate)
return nil, err
end
return res.insert_id
end
-- 获取用户
function _M.get_user(user_id)
local db, err = connect_mysql()
if not db then
return nil, err
end
local sql = "SELECT id, name, email, age, created_at FROM users WHERE id = ?"
local res, err, errcode, sqlstate = db:query(sql, user_id)
close_mysql(db)
if not res then
ngx.log(ngx.ERR, "failed to query user: ", err, ": ", errcode, " ", sqlstate)
return nil, err
end
if #res == 0 then
return nil, "User not found"
end
return res[1]
end
-- 更新用户
function _M.update_user(user_id, user_data)
local db, err = connect_mysql()
if not db then
return nil, err
end
local sql = "UPDATE users SET name = ?, email = ?, age = ?, updated_at = NOW() WHERE id = ?"
local res, err, errcode, sqlstate = db:query(sql, user_data.name, user_data.email, user_data.age, user_id)
close_mysql(db)
if not res then
ngx.log(ngx.ERR, "failed to update user: ", err, ": ", errcode, " ", sqlstate)
return nil, err
end
return res.affected_rows > 0
end
-- 删除用户
function _M.delete_user(user_id)
local db, err = connect_mysql()
if not db then
return nil, err
end
local sql = "DELETE FROM users WHERE id = ?"
local res, err, errcode, sqlstate = db:query(sql, user_id)
close_mysql(db)
if not res then
ngx.log(ngx.ERR, "failed to delete user: ", err, ": ", errcode, " ", sqlstate)
return nil, err
end
return res.affected_rows > 0
end
-- 获取用户列表
function _M.get_users(page, limit)
page = page or 1
limit = limit or 10
local offset = (page - 1) * limit
local db, err = connect_mysql()
if not db then
return nil, err
end
-- 获取总数
local count_sql = "SELECT COUNT(*) as total FROM users"
local count_res, err = db:query(count_sql)
if not count_res then
close_mysql(db)
return nil, err
end
local total = count_res[1].total
-- 获取分页数据
local sql = "SELECT id, name, email, age, created_at FROM users ORDER BY id DESC LIMIT ? OFFSET ?"
local res, err, errcode, sqlstate = db:query(sql, limit, offset)
close_mysql(db)
if not res then
ngx.log(ngx.ERR, "failed to query users: ", err, ": ", errcode, " ", sqlstate)
return nil, err
end
return {
users = res,
total = total,
page = page,
limit = limit,
pages = math.ceil(total / limit)
}
end
return _M
2.3 事务处理
local mysql = require "resty.mysql"
-- 转账事务示例
local function transfer_money(from_account, to_account, amount)
local db, err = connect_mysql()
if not db then
return nil, err
end
-- 开始事务
local res, err = db:query("START TRANSACTION")
if not res then
close_mysql(db)
return nil, "Failed to start transaction: " .. err
end
local function rollback()
db:query("ROLLBACK")
close_mysql(db)
end
-- 检查源账户余额
local check_sql = "SELECT balance FROM accounts WHERE id = ? FOR UPDATE"
local balance_res, err = db:query(check_sql, from_account)
if not balance_res or #balance_res == 0 then
rollback()
return nil, "Source account not found"
end
local current_balance = balance_res[1].balance
if current_balance < amount then
rollback()
return nil, "Insufficient balance"
end
-- 扣除源账户金额
local debit_sql = "UPDATE accounts SET balance = balance - ? WHERE id = ?"
local debit_res, err = db:query(debit_sql, amount, from_account)
if not debit_res or debit_res.affected_rows == 0 then
rollback()
return nil, "Failed to debit source account"
end
-- 增加目标账户金额
local credit_sql = "UPDATE accounts SET balance = balance + ? WHERE id = ?"
local credit_res, err = db:query(credit_sql, amount, to_account)
if not credit_res or credit_res.affected_rows == 0 then
rollback()
return nil, "Failed to credit target account"
end
-- 记录转账日志
local log_sql = "INSERT INTO transfer_logs (from_account, to_account, amount, created_at) VALUES (?, ?, ?, NOW())"
local log_res, err = db:query(log_sql, from_account, to_account, amount)
if not log_res then
rollback()
return nil, "Failed to log transfer"
end
-- 提交事务
local commit_res, err = db:query("COMMIT")
if not commit_res then
rollback()
return nil, "Failed to commit transaction: " .. err
end
close_mysql(db)
return true
end
-- 使用事务
local success, err = transfer_money(1, 2, 100.00)
if success then
ngx.say("Transfer completed successfully")
else
ngx.log(ngx.ERR, "Transfer failed: ", err)
ngx.status = 400
ngx.say("Transfer failed: ", err)
end
2.4 批量操作
-- 批量插入用户
local function batch_insert_users(users)
local db, err = connect_mysql()
if not db then
return nil, err
end
-- 构建批量插入SQL
local values = {}
local params = {}
for i, user in ipairs(users) do
table.insert(values, "(?, ?, ?, NOW())")
table.insert(params, user.name)
table.insert(params, user.email)
table.insert(params, user.age)
end
local sql = "INSERT INTO users (name, email, age, created_at) VALUES " .. table.concat(values, ", ")
local res, err, errcode, sqlstate = db:query(sql, unpack(params))
close_mysql(db)
if not res then
ngx.log(ngx.ERR, "failed to batch insert users: ", err, ": ", errcode, " ", sqlstate)
return nil, err
end
return res.affected_rows
end
-- 使用示例
local users_data = {
{name = "Alice", email = "alice@example.com", age = 25},
{name = "Bob", email = "bob@example.com", age = 30},
{name = "Charlie", email = "charlie@example.com", age = 35}
}
local affected_rows, err = batch_insert_users(users_data)
if affected_rows then
ngx.say("Inserted ", affected_rows, " users")
else
ngx.say("Batch insert failed: ", err)
end
3. Redis操作
3.1 基础连接和操作
local redis = require "resty.redis"
-- 创建Redis连接
local function connect_redis()
local red = redis:new()
red:set_timeout(5000) -- 5秒超时
local ok, err = red:connect("127.0.0.1", 6379, {
pool = "redis_pool",
pool_size = 100,
backlog = 50
})
if not ok then
ngx.log(ngx.ERR, "failed to connect to redis: ", err)
return nil, err
end
-- 认证(如果需要)
-- local res, err = red:auth("password")
-- if not res then
-- return nil, err
-- end
-- 选择数据库
local res, err = red:select(0)
if not res then
return nil, err
end
return red
end
-- 释放Redis连接
local function close_redis(red)
if not red then
return
end
local ok, err = red:set_keepalive(30000, 100)
if not ok then
ngx.log(ngx.ERR, "failed to set keepalive: ", err)
red:close()
end
end
3.2 缓存操作
local redis = require "resty.redis"
local cjson = require "cjson"
-- 缓存管理模块
local cache = {}
-- 设置缓存
function cache.set(key, value, ttl)
local red, err = connect_redis()
if not red then
return false, err
end
-- 序列化值
local serialized_value
if type(value) == "table" then
serialized_value = cjson.encode(value)
else
serialized_value = tostring(value)
end
local ok, err
if ttl then
ok, err = red:setex(key, ttl, serialized_value)
else
ok, err = red:set(key, serialized_value)
end
close_redis(red)
if not ok then
ngx.log(ngx.ERR, "failed to set cache: ", err)
return false, err
end
return true
end
-- 获取缓存
function cache.get(key)
local red, err = connect_redis()
if not red then
return nil, err
end
local value, err = red:get(key)
close_redis(red)
if not value or value == ngx.null then
return nil, "not found"
end
if err then
ngx.log(ngx.ERR, "failed to get cache: ", err)
return nil, err
end
-- 尝试反序列化JSON
local ok, decoded = pcall(cjson.decode, value)
if ok then
return decoded
else
return value
end
end
-- 删除缓存
function cache.delete(key)
local red, err = connect_redis()
if not red then
return false, err
end
local ok, err = red:del(key)
close_redis(red)
if not ok then
ngx.log(ngx.ERR, "failed to delete cache: ", err)
return false, err
end
return ok > 0
end
-- 批量操作
function cache.mget(keys)
local red, err = connect_redis()
if not red then
return nil, err
end
local values, err = red:mget(unpack(keys))
close_redis(red)
if not values then
ngx.log(ngx.ERR, "failed to mget cache: ", err)
return nil, err
end
local result = {}
for i, value in ipairs(values) do
if value ~= ngx.null then
local ok, decoded = pcall(cjson.decode, value)
result[keys[i]] = ok and decoded or value
end
end
return result
end
-- 使用示例
local user_data = {
id = 123,
name = "John Doe",
email = "john@example.com"
}
-- 设置缓存,5分钟过期
cache.set("user:123", user_data, 300)
-- 获取缓存
local cached_user = cache.get("user:123")
if cached_user then
ngx.say("Cached user: ", cjson.encode(cached_user))
else
ngx.say("User not found in cache")
end
3.3 分布式锁
local redis = require "resty.redis"
-- 分布式锁实现
local lock = {}
-- 获取锁
function lock.acquire(key, timeout, retry_times)
timeout = timeout or 10 -- 默认10秒超时
retry_times = retry_times or 3
local lock_key = "lock:" .. key
local lock_value = ngx.time() .. ":" .. ngx.worker.pid()
for i = 1, retry_times do
local red, err = connect_redis()
if not red then
return false, err
end
-- 使用SET NX EX原子操作
local ok, err = red:set(lock_key, lock_value, "NX", "EX", timeout)
close_redis(red)
if ok and ok ~= ngx.null then
return true, lock_value
end
if i < retry_times then
ngx.sleep(0.1) -- 等待100ms后重试
end
end
return false, "Failed to acquire lock after " .. retry_times .. " attempts"
end
-- 释放锁
function lock.release(key, lock_value)
local red, err = connect_redis()
if not red then
return false, err
end
local lock_key = "lock:" .. key
-- Lua脚本确保原子性
local script = [[
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
]]
local ok, err = red:eval(script, 1, lock_key, lock_value)
close_redis(red)
if not ok then
ngx.log(ngx.ERR, "failed to release lock: ", err)
return false, err
end
return ok == 1
end
-- 使用分布式锁
local function critical_section()
local lock_acquired, lock_value = lock.acquire("critical_resource", 30)
if not lock_acquired then
ngx.log(ngx.WARN, "Failed to acquire lock: ", lock_value)
return false, "Resource is busy"
end
-- 执行临界区代码
ngx.log(ngx.INFO, "Executing critical section")
-- 模拟一些工作
ngx.sleep(1)
-- 释放锁
local released = lock.release("critical_resource", lock_value)
if not released then
ngx.log(ngx.WARN, "Failed to release lock")
end
return true, "Operation completed"
end
-- 调用示例
local success, message = critical_section()
ngx.say(success and "Success: " or "Error: ", message)
3.4 发布订阅
local redis = require "resty.redis"
local cjson = require "cjson"
-- 发布消息
local function publish_message(channel, message)
local red, err = connect_redis()
if not red then
return false, err
end
local serialized_message = cjson.encode({
timestamp = ngx.time(),
data = message,
source = "openresty"
})
local subscribers, err = red:publish(channel, serialized_message)
close_redis(red)
if not subscribers then
ngx.log(ngx.ERR, "failed to publish message: ", err)
return false, err
end
ngx.log(ngx.INFO, "Message published to ", subscribers, " subscribers")
return true, subscribers
end
-- 订阅消息(在init_worker_by_lua中使用)
local function subscribe_messages(channels, callback)
local red, err = connect_redis()
if not red then
ngx.log(ngx.ERR, "failed to connect for subscription: ", err)
return
end
-- 订阅频道
local ok, err = red:subscribe(unpack(channels))
if not ok then
ngx.log(ngx.ERR, "failed to subscribe: ", err)
return
end
-- 处理消息
while true do
local res, err = red:read_reply()
if not res then
ngx.log(ngx.ERR, "failed to read reply: ", err)
break
end
if res[1] == "message" then
local channel = res[2]
local message = res[3]
-- 解析消息
local ok, parsed_message = pcall(cjson.decode, message)
if ok then
callback(channel, parsed_message)
else
callback(channel, {data = message})
end
end
end
red:close()
end
-- 消息处理回调
local function message_handler(channel, message)
ngx.log(ngx.INFO, "Received message on ", channel, ": ", cjson.encode(message))
-- 根据频道处理不同类型的消息
if channel == "user_events" then
-- 处理用户事件
if message.data.action == "login" then
ngx.log(ngx.INFO, "User ", message.data.user_id, " logged in")
end
elseif channel == "system_alerts" then
-- 处理系统告警
ngx.log(ngx.WARN, "System alert: ", message.data.alert)
end
end
-- 在init_worker_by_lua中启动订阅
-- subscribe_messages({"user_events", "system_alerts"}, message_handler)
-- 发布消息示例
local success, subscribers = publish_message("user_events", {
action = "login",
user_id = 123,
ip = ngx.var.remote_addr
})
if success then
ngx.say("Message published to ", subscribers, " subscribers")
else
ngx.say("Failed to publish message")
end
4. PostgreSQL操作
4.1 基础连接
local postgres = require "resty.postgres"
-- 创建PostgreSQL连接
local function connect_postgres()
local pg = postgres:new()
pg:set_timeout(5000)
local ok, err = pg:connect({
host = "127.0.0.1",
port = 5432,
database = "test_db",
user = "postgres",
password = "password",
pool = "postgres_pool",
pool_size = 50,
backlog = 30
})
if not ok then
ngx.log(ngx.ERR, "failed to connect to postgres: ", err)
return nil, err
end
return pg
end
-- 释放连接
local function close_postgres(pg)
if not pg then
return
end
local ok, err = pg:set_keepalive(30000, 50)
if not ok then
ngx.log(ngx.ERR, "failed to set keepalive: ", err)
pg:close()
end
end
4.2 基本操作
-- PostgreSQL用户管理
local pg_users = {}
-- 创建用户
function pg_users.create(user_data)
local pg, err = connect_postgres()
if not pg then
return nil, err
end
local sql = [[
INSERT INTO users (name, email, age, created_at)
VALUES ($1, $2, $3, NOW())
RETURNING id
]]
local res, err = pg:query(sql, user_data.name, user_data.email, user_data.age)
close_postgres(pg)
if not res then
ngx.log(ngx.ERR, "failed to create user: ", err)
return nil, err
end
return res[1].id
end
-- 获取用户
function pg_users.get(user_id)
local pg, err = connect_postgres()
if not pg then
return nil, err
end
local sql = "SELECT id, name, email, age, created_at FROM users WHERE id = $1"
local res, err = pg:query(sql, user_id)
close_postgres(pg)
if not res then
ngx.log(ngx.ERR, "failed to get user: ", err)
return nil, err
end
if #res == 0 then
return nil, "User not found"
end
return res[1]
end
-- JSON字段操作
function pg_users.update_profile(user_id, profile_data)
local pg, err = connect_postgres()
if not pg then
return nil, err
end
local sql = [[
UPDATE users
SET profile = profile || $2::jsonb, updated_at = NOW()
WHERE id = $1
RETURNING profile
]]
local profile_json = require("cjson").encode(profile_data)
local res, err = pg:query(sql, user_id, profile_json)
close_postgres(pg)
if not res then
ngx.log(ngx.ERR, "failed to update profile: ", err)
return nil, err
end
return res[1] and res[1].profile
end
return pg_users
5. 数据库连接池管理
5.1 连接池配置
-- 数据库连接管理器
local db_manager = {}
-- 数据库配置
local db_configs = {
mysql = {
host = "127.0.0.1",
port = 3306,
database = "app_db",
user = "app_user",
password = "app_password",
charset = "utf8mb4",
pool = "mysql_pool",
pool_size = 100,
backlog = 50,
timeout = 5000,
keepalive_timeout = 30000,
keepalive_pool = 50
},
redis = {
host = "127.0.0.1",
port = 6379,
database = 0,
pool = "redis_pool",
pool_size = 100,
backlog = 50,
timeout = 5000,
keepalive_timeout = 30000,
keepalive_pool = 50
}
}
-- 获取MySQL连接
function db_manager.get_mysql()
local mysql = require "resty.mysql"
local config = db_configs.mysql
local db, err = mysql:new()
if not db then
return nil, err
end
db:set_timeout(config.timeout)
local ok, err, errcode, sqlstate = db:connect({
host = config.host,
port = config.port,
database = config.database,
user = config.user,
password = config.password,
charset = config.charset,
pool = config.pool,
pool_size = config.pool_size,
backlog = config.backlog
})
if not ok then
return nil, err
end
return db
end
-- 释放MySQL连接
function db_manager.close_mysql(db)
if not db then
return
end
local config = db_configs.mysql
local ok, err = db:set_keepalive(config.keepalive_timeout, config.keepalive_pool)
if not ok then
ngx.log(ngx.ERR, "failed to set mysql keepalive: ", err)
db:close()
end
end
-- 获取Redis连接
function db_manager.get_redis()
local redis = require "resty.redis"
local config = db_configs.redis
local red = redis:new()
red:set_timeout(config.timeout)
local ok, err = red:connect(config.host, config.port, {
pool = config.pool,
pool_size = config.pool_size,
backlog = config.backlog
})
if not ok then
return nil, err
end
if config.database ~= 0 then
local res, err = red:select(config.database)
if not res then
return nil, err
end
end
return red
end
-- 释放Redis连接
function db_manager.close_redis(red)
if not red then
return
end
local config = db_configs.redis
local ok, err = red:set_keepalive(config.keepalive_timeout, config.keepalive_pool)
if not ok then
ngx.log(ngx.ERR, "failed to set redis keepalive: ", err)
red:close()
end
end
-- 健康检查
function db_manager.health_check()
local health = {
mysql = false,
redis = false
}
-- 检查MySQL
local db, err = db_manager.get_mysql()
if db then
local res, err = db:query("SELECT 1")
if res then
health.mysql = true
end
db_manager.close_mysql(db)
end
-- 检查Redis
local red, err = db_manager.get_redis()
if red then
local res, err = red:ping()
if res == "PONG" then
health.redis = true
end
db_manager.close_redis(red)
end
return health
end
return db_manager
5.2 连接池监控
-- 连接池监控
local function get_pool_stats()
local stats = {}
-- 获取共享内存中的统计信息
local cache = ngx.shared.my_stats
if cache then
local keys = cache:get_keys(0)
for _, key in ipairs(keys) do
if string.match(key, "^pool_") then
stats[key] = cache:get(key)
end
end
end
return stats
end
-- 记录连接池使用情况
local function record_pool_usage(pool_name, action)
local cache = ngx.shared.my_stats
if not cache then
return
end
local key = "pool_" .. pool_name .. "_" .. action
cache:incr(key, 1, 0, 3600) -- 1小时过期
end
-- 在连接获取和释放时记录统计
local original_get_mysql = db_manager.get_mysql
function db_manager.get_mysql()
record_pool_usage("mysql", "get")
return original_get_mysql()
end
local original_close_mysql = db_manager.close_mysql
function db_manager.close_mysql(db)
record_pool_usage("mysql", "close")
return original_close_mysql(db)
end
-- 监控接口
location /db-stats {
content_by_lua_block {
local stats = get_pool_stats()
local health = db_manager.health_check()
local response = {
health = health,
pool_stats = stats,
timestamp = ngx.time()
}
ngx.header.content_type = "application/json"
ngx.say(require("cjson").encode(response))
}
}
6. 总结
OpenResty提供了丰富的数据库连接库,支持MySQL、PostgreSQL、Redis等主流数据库。通过合理使用连接池、事务处理、错误处理等机制,我们可以构建高性能、高可用的数据库应用。掌握这些数据库操作技能是开发现代Web应用的重要基础。