1. lua-resty-http简介
1.1 什么是lua-resty-http
lua-resty-http是OpenResty生态中最重要的HTTP客户端库,它提供了完整的HTTP/HTTPS客户端功能,支持连接池、SSL、代理等高级特性。
1.2 主要特性
- 连接复用:自动管理连接池
- SSL/TLS支持:完整的HTTPS支持
- 流式处理:支持大文件上传下载
- 代理支持:HTTP/HTTPS代理
- 认证支持:Basic Auth、Bearer Token等
- 超时控制:精确的超时管理
- 重试机制:自动重试失败请求
1.3 安装和配置
# 通过opm安装
opm get ledgetech/lua-resty-http
# 或者手动下载
wget https://github.com/ledgetech/lua-resty-http/archive/master.zip
# nginx.conf配置
http {
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
# DNS解析器配置
resolver 8.8.8.8 8.8.4.4 valid=300s;
resolver_timeout 5s;
}
2. 基本用法
2.1 简单GET请求
local http = require "resty.http"
local httpc = http.new()
-- 基本GET请求
local res, err = httpc:request_uri("https://api.example.com/users")
if not res then
ngx.log(ngx.ERR, "request failed: ", err)
return
end
ngx.say("Status: ", res.status)
ngx.say("Body: ", res.body)
-- 关闭连接
httpc:close()
2.2 带参数的GET请求
local http = require "resty.http"
local httpc = http.new()
-- 构建查询参数
local args = {
page = 1,
limit = 10,
sort = "created_at"
}
-- 方法1:手动构建URL
local query_string = ngx.encode_args(args)
local url = "https://api.example.com/users?" .. query_string
local res, err = httpc:request_uri(url)
if res then
ngx.say(res.body)
end
-- 方法2:使用query参数
local res, err = httpc:request_uri("https://api.example.com/users", {
method = "GET",
query = args
})
httpc:close()
2.3 POST请求
local http = require "resty.http"
local cjson = require "cjson"
local httpc = http.new()
-- JSON数据
local data = {
name = "John Doe",
email = "john@example.com",
age = 30
}
local res, err = httpc:request_uri("https://api.example.com/users", {
method = "POST",
body = cjson.encode(data),
headers = {
["Content-Type"] = "application/json",
["Authorization"] = "Bearer your-token-here"
}
})
if not res then
ngx.log(ngx.ERR, "request failed: ", err)
return
end
if res.status == 201 then
ngx.say("User created successfully")
ngx.say(res.body)
else
ngx.say("Failed to create user: ", res.status)
end
httpc:close()
2.4 表单数据提交
local http = require "resty.http"
local httpc = http.new()
-- 表单数据
local form_data = {
username = "john",
password = "secret123",
remember = "1"
}
local res, err = httpc:request_uri("https://api.example.com/login", {
method = "POST",
body = ngx.encode_args(form_data),
headers = {
["Content-Type"] = "application/x-www-form-urlencoded"
}
})
if res and res.status == 200 then
-- 处理登录响应
local response_data = require("cjson").decode(res.body)
ngx.say("Login successful, token: ", response_data.token)
end
httpc:close()
3. 高级功能
3.1 连接池管理
local http = require "resty.http"
-- 创建HTTP客户端
local httpc = http.new()
-- 连接到服务器
local ok, err = httpc:connect("api.example.com", 443, {
scheme = "https",
ssl_verify = true,
pool = "api_pool", -- 连接池名称
pool_size = 100, -- 连接池大小
backlog = 50 -- 等待队列大小
})
if not ok then
ngx.log(ngx.ERR, "failed to connect: ", err)
return
end
-- 发送请求
local res, err = httpc:request({
method = "GET",
path = "/api/users",
headers = {
["Host"] = "api.example.com",
["User-Agent"] = "OpenResty/1.21"
}
})
if res then
-- 读取响应体
local body, err = res:read_body()
if body then
ngx.say(body)
end
end
-- 保持连接到池中
local ok, err = httpc:set_keepalive(30000, 100) -- 30秒超时,最多100个连接
if not ok then
ngx.log(ngx.ERR, "failed to set keepalive: ", err)
end
3.2 超时控制
local http = require "resty.http"
local httpc = http.new()
-- 设置超时时间
httpc:set_timeout(5000) -- 5秒总超时
local res, err = httpc:request_uri("https://slow-api.example.com/data", {
method = "GET",
headers = {
["Accept"] = "application/json"
}
})
if not res then
if err == "timeout" then
ngx.log(ngx.WARN, "Request timed out")
ngx.status = 504
ngx.say("Gateway Timeout")
else
ngx.log(ngx.ERR, "Request failed: ", err)
ngx.status = 502
ngx.say("Bad Gateway")
end
return
end
ngx.say(res.body)
httpc:close()
3.3 重试机制
local http = require "resty.http"
local function make_request_with_retry(url, options, max_retries)
max_retries = max_retries or 3
local last_err
for i = 1, max_retries do
local httpc = http.new()
httpc:set_timeout(5000)
local res, err = httpc:request_uri(url, options)
if res then
httpc:close()
return res, nil
end
last_err = err
httpc:close()
-- 指数退避
if i < max_retries then
local delay = math.min(1000 * (2 ^ (i - 1)), 10000) -- 最大10秒
ngx.sleep(delay / 1000)
end
ngx.log(ngx.WARN, "Request attempt ", i, " failed: ", err)
end
return nil, "Max retries exceeded. Last error: " .. (last_err or "unknown")
end
-- 使用重试机制
local res, err = make_request_with_retry("https://unreliable-api.example.com/data", {
method = "GET",
headers = {
["Authorization"] = "Bearer token"
}
})
if res then
ngx.say("Success: ", res.body)
else
ngx.log(ngx.ERR, "All retries failed: ", err)
ngx.status = 502
ngx.say("Service temporarily unavailable")
end
3.4 流式处理
local http = require "resty.http"
local httpc = http.new()
-- 连接到服务器
local ok, err = httpc:connect("api.example.com", 443, {
scheme = "https"
})
if not ok then
ngx.log(ngx.ERR, "failed to connect: ", err)
return
end
-- 发送请求
local res, err = httpc:request({
method = "GET",
path = "/api/large-file",
headers = {
["Host"] = "api.example.com"
}
})
if not res then
ngx.log(ngx.ERR, "request failed: ", err)
return
end
-- 流式读取响应
local total_size = 0
while true do
local chunk, err = res:read_body(8192) -- 每次读取8KB
if err then
ngx.log(ngx.ERR, "read body failed: ", err)
break
end
if not chunk then
break -- 读取完成
end
total_size = total_size + #chunk
ngx.print(chunk) -- 流式输出
ngx.flush(true) -- 立即刷新
end
ngx.log(ngx.INFO, "Total bytes read: ", total_size)
httpc:close()
4. 认证和安全
4.1 Basic认证
local http = require "resty.http"
local httpc = http.new()
-- 方法1:手动构建Authorization头
local username = "admin"
local password = "secret"
local auth_string = ngx.encode_base64(username .. ":" .. password)
local res, err = httpc:request_uri("https://api.example.com/protected", {
method = "GET",
headers = {
["Authorization"] = "Basic " .. auth_string
}
})
-- 方法2:使用auth选项
local res, err = httpc:request_uri("https://api.example.com/protected", {
method = "GET",
auth = {
username = username,
password = password
}
})
if res and res.status == 200 then
ngx.say("Authentication successful")
ngx.say(res.body)
else
ngx.say("Authentication failed")
end
httpc:close()
4.2 Bearer Token认证
local http = require "resty.http"
local cjson = require "cjson"
-- 获取访问令牌
local function get_access_token()
local httpc = http.new()
local res, err = httpc:request_uri("https://auth.example.com/oauth/token", {
method = "POST",
body = ngx.encode_args({
grant_type = "client_credentials",
client_id = "your_client_id",
client_secret = "your_client_secret",
scope = "api:read api:write"
}),
headers = {
["Content-Type"] = "application/x-www-form-urlencoded"
}
})
httpc:close()
if res and res.status == 200 then
local token_data = cjson.decode(res.body)
return token_data.access_token
end
return nil
end
-- 使用令牌调用API
local function call_api_with_token(endpoint, token)
local httpc = http.new()
local res, err = httpc:request_uri("https://api.example.com" .. endpoint, {
method = "GET",
headers = {
["Authorization"] = "Bearer " .. token,
["Accept"] = "application/json"
}
})
httpc:close()
return res, err
end
-- 主逻辑
local token = get_access_token()
if not token then
ngx.status = 401
ngx.say("Failed to obtain access token")
return
end
local res, err = call_api_with_token("/api/users", token)
if res then
ngx.header.content_type = "application/json"
ngx.say(res.body)
else
ngx.status = 502
ngx.say("API call failed: ", err)
end
4.3 SSL/TLS配置
local http = require "resty.http"
local httpc = http.new()
-- SSL选项配置
local ssl_opts = {
verify = true, -- 验证服务器证书
server_name = "api.example.com", -- SNI服务器名
depth = 2, -- 证书链验证深度
cert = "/path/to/client.crt", -- 客户端证书
key = "/path/to/client.key" -- 客户端私钥
}
local ok, err = httpc:connect("api.example.com", 443, {
scheme = "https",
ssl_opts = ssl_opts
})
if not ok then
ngx.log(ngx.ERR, "SSL connection failed: ", err)
return
end
-- 发送请求
local res, err = httpc:request({
method = "GET",
path = "/api/secure-data",
headers = {
["Host"] = "api.example.com"
}
})
if res then
local body = res:read_body()
ngx.say(body)
end
httpc:close()
5. 错误处理和监控
5.1 完整的错误处理
local http = require "resty.http"
local cjson = require "cjson"
local function handle_api_error(res, err, context)
if not res then
-- 网络错误
ngx.log(ngx.ERR, "Network error in ", context, ": ", err)
return {
success = false,
error = "network_error",
message = "Network connection failed"
}
end
if res.status >= 500 then
-- 服务器错误
ngx.log(ngx.ERR, "Server error in ", context, ": ", res.status)
return {
success = false,
error = "server_error",
message = "Internal server error",
status = res.status
}
elseif res.status >= 400 then
-- 客户端错误
local error_body = res.body
local error_data
-- 尝试解析错误响应
local ok, parsed = pcall(cjson.decode, error_body)
if ok then
error_data = parsed
else
error_data = {message = error_body}
end
ngx.log(ngx.WARN, "Client error in ", context, ": ", res.status, " ", error_body)
return {
success = false,
error = "client_error",
message = error_data.message or "Bad request",
status = res.status,
details = error_data
}
end
-- 成功响应
return {
success = true,
data = res.body,
status = res.status
}
end
-- 使用错误处理
local function get_user_data(user_id)
local httpc = http.new()
httpc:set_timeout(5000)
local res, err = httpc:request_uri("https://api.example.com/users/" .. user_id, {
method = "GET",
headers = {
["Authorization"] = "Bearer " .. get_auth_token(),
["Accept"] = "application/json"
}
})
httpc:close()
return handle_api_error(res, err, "get_user_data")
end
-- API调用
local result = get_user_data(123)
if result.success then
local user_data = cjson.decode(result.data)
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
success = true,
user = user_data
}))
else
ngx.status = result.status or 500
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
success = false,
error = result.error,
message = result.message
}))
end
5.2 性能监控
local http = require "resty.http"
local cjson = require "cjson"
-- 性能监控装饰器
local function monitor_api_call(name, func, ...)
local start_time = ngx.now()
local cache = ngx.shared.my_stats
-- 增加调用计数
local count_key = "api_calls:" .. name
cache:incr(count_key, 1, 0, 86400) -- 24小时过期
-- 执行函数
local result = func(...)
-- 记录执行时间
local duration = ngx.now() - start_time
local time_key = "api_time:" .. name
local total_time = cache:get(time_key) or 0
cache:set(time_key, total_time + duration, 86400)
-- 记录错误
if not result.success then
local error_key = "api_errors:" .. name
cache:incr(error_key, 1, 0, 86400)
end
-- 记录慢查询
if duration > 1.0 then -- 超过1秒
ngx.log(ngx.WARN, "Slow API call: ", name, " took ", duration, "s")
local slow_key = "api_slow:" .. name
cache:incr(slow_key, 1, 0, 86400)
end
return result
end
-- 监控的API调用函数
local function monitored_get_user(user_id)
return monitor_api_call("get_user", function(uid)
local httpc = http.new()
local res, err = httpc:request_uri("https://api.example.com/users/" .. uid)
httpc:close()
if res and res.status == 200 then
return {success = true, data = res.body}
else
return {success = false, error = err or "HTTP " .. (res and res.status or "error")}
end
end, user_id)
end
-- 获取监控统计
local function get_api_stats()
local cache = ngx.shared.my_stats
local stats = {}
local keys = cache:get_keys(0)
for _, key in ipairs(keys) do
local value = cache:get(key)
stats[key] = value
end
return stats
end
-- 统计接口
location /api-stats {
content_by_lua_block {
local stats = get_api_stats()
ngx.header.content_type = "application/json"
ngx.say(require("cjson").encode(stats))
}
}
6. 实际应用场景
6.1 API网关代理
-- API网关实现
local http = require "resty.http"
local cjson = require "cjson"
-- 服务发现
local services = {
user = {"http://user-service-1:8080", "http://user-service-2:8080"},
order = {"http://order-service-1:8080", "http://order-service-2:8080"},
payment = {"http://payment-service:8080"}
}
-- 负载均衡
local function get_backend_url(service_name)
local backends = services[service_name]
if not backends then
return nil, "Service not found"
end
-- 简单轮询
local index = (ngx.time() % #backends) + 1
return backends[index]
end
-- 代理请求
local function proxy_request(service_name, path)
local backend_url, err = get_backend_url(service_name)
if not backend_url then
return nil, err
end
local httpc = http.new()
httpc:set_timeout(10000)
-- 转发请求
local res, err = httpc:request_uri(backend_url .. path, {
method = ngx.var.request_method,
body = ngx.var.request_body,
headers = ngx.req.get_headers(),
query = ngx.req.get_uri_args()
})
httpc:close()
return res, err
end
-- 路由处理
local uri = ngx.var.uri
local service_name, path = string.match(uri, "^/api/([^/]+)(.*)$")
if not service_name then
ngx.status = 404
ngx.say("Invalid API path")
return
end
local res, err = proxy_request(service_name, path)
if not res then
ngx.status = 502
ngx.say("Backend service unavailable: ", err)
return
end
-- 返回响应
ngx.status = res.status
for name, value in pairs(res.headers) do
ngx.header[name] = value
end
ngx.say(res.body)
6.2 数据聚合服务
-- 数据聚合示例
local http = require "resty.http"
local cjson = require "cjson"
-- 并发请求函数
local function concurrent_requests(requests)
local threads = {}
-- 启动所有请求
for i, req in ipairs(requests) do
local thread = ngx.thread.spawn(function()
local httpc = http.new()
httpc:set_timeout(5000)
local res, err = httpc:request_uri(req.url, req.options or {})
httpc:close()
return {
name = req.name,
success = res ~= nil,
data = res and res.body or nil,
error = err,
status = res and res.status or nil
}
end)
threads[i] = thread
end
-- 等待所有请求完成
local results = {}
for i, thread in ipairs(threads) do
local ok, result = ngx.thread.wait(thread)
if ok then
results[result.name] = result
else
results["request_" .. i] = {
success = false,
error = "Thread execution failed"
}
end
end
return results
end
-- 用户详情聚合
local function get_user_details(user_id)
local requests = {
{
name = "profile",
url = "https://user-api.example.com/users/" .. user_id,
options = {
headers = {["Authorization"] = "Bearer " .. get_auth_token()}
}
},
{
name = "orders",
url = "https://order-api.example.com/users/" .. user_id .. "/orders",
options = {
headers = {["Authorization"] = "Bearer " .. get_auth_token()}
}
},
{
name = "preferences",
url = "https://pref-api.example.com/users/" .. user_id,
options = {
headers = {["Authorization"] = "Bearer " .. get_auth_token()}
}
}
}
local results = concurrent_requests(requests)
-- 组装响应
local response = {
user_id = user_id,
timestamp = ngx.time()
}
-- 处理用户资料
if results.profile and results.profile.success then
local profile_data = cjson.decode(results.profile.data)
response.profile = profile_data
else
response.profile = {error = "Failed to load profile"}
end
-- 处理订单数据
if results.orders and results.orders.success then
local orders_data = cjson.decode(results.orders.data)
response.orders = orders_data
else
response.orders = {error = "Failed to load orders"}
end
-- 处理偏好设置
if results.preferences and results.preferences.success then
local pref_data = cjson.decode(results.preferences.data)
response.preferences = pref_data
else
response.preferences = {error = "Failed to load preferences"}
end
return response
end
-- API端点
local user_id = ngx.var.arg_user_id
if not user_id then
ngx.status = 400
ngx.say("Missing user_id parameter")
return
end
local user_details = get_user_details(user_id)
ngx.header.content_type = "application/json"
ngx.say(cjson.encode(user_details))
7. 性能优化
7.1 连接池优化
-- 全局连接池管理
local _M = {}
-- 连接池配置
local pool_configs = {
api_service = {
host = "api.example.com",
port = 443,
scheme = "https",
pool_name = "api_pool",
pool_size = 100,
backlog = 50,
keepalive_timeout = 60000,
keepalive_pool = 50
}
}
-- 获取连接
function _M.get_connection(service_name)
local config = pool_configs[service_name]
if not config then
return nil, "Unknown service: " .. service_name
end
local httpc = require("resty.http").new()
local ok, err = httpc:connect(config.host, config.port, {
scheme = config.scheme,
pool = config.pool_name,
pool_size = config.pool_size,
backlog = config.backlog
})
if not ok then
return nil, "Connection failed: " .. err
end
return httpc
end
-- 释放连接
function _M.release_connection(httpc, service_name)
local config = pool_configs[service_name]
if not config then
httpc:close()
return
end
local ok, err = httpc:set_keepalive(
config.keepalive_timeout,
config.keepalive_pool
)
if not ok then
ngx.log(ngx.WARN, "Failed to set keepalive: ", err)
httpc:close()
end
end
return _M
7.2 缓存策略
local http = require "resty.http"
local cjson = require "cjson"
-- 带缓存的API调用
local function cached_api_call(cache_key, url, options, ttl)
ttl = ttl or 300 -- 默认5分钟缓存
local cache = ngx.shared.my_cache
-- 尝试从缓存获取
local cached_data = cache:get(cache_key)
if cached_data then
ngx.log(ngx.INFO, "Cache hit for: ", cache_key)
return cjson.decode(cached_data)
end
-- 缓存未命中,调用API
ngx.log(ngx.INFO, "Cache miss for: ", cache_key)
local httpc = http.new()
httpc:set_timeout(5000)
local res, err = httpc:request_uri(url, options)
httpc:close()
if not res or res.status ~= 200 then
return nil, err or ("HTTP " .. res.status)
end
local data = cjson.decode(res.body)
-- 存入缓存
local ok, err = cache:set(cache_key, res.body, ttl)
if not ok then
ngx.log(ngx.WARN, "Failed to cache data: ", err)
end
return data
end
-- 使用示例
local user_id = ngx.var.arg_user_id
local cache_key = "user_profile:" .. user_id
local user_data, err = cached_api_call(
cache_key,
"https://api.example.com/users/" .. user_id,
{
headers = {["Authorization"] = "Bearer " .. get_auth_token()}
},
600 -- 10分钟缓存
)
if user_data then
ngx.header.content_type = "application/json"
ngx.say(cjson.encode(user_data))
else
ngx.status = 500
ngx.say("Failed to get user data: ", err)
end
8. 总结
lua-resty-http是OpenResty生态中不可或缺的HTTP客户端库,它提供了完整的HTTP/HTTPS客户端功能。通过合理使用连接池、缓存、错误处理和监控机制,我们可以构建高性能、高可用的API服务。掌握这些技能对于开发现代Web应用和微服务架构至关重要。