1. lua-resty-http简介

1.1 什么是lua-resty-http

lua-resty-http是OpenResty生态中最重要的HTTP客户端库,它提供了完整的HTTP/HTTPS客户端功能,支持连接池、SSL、代理等高级特性。

1.2 主要特性

  • 连接复用:自动管理连接池
  • SSL/TLS支持:完整的HTTPS支持
  • 流式处理:支持大文件上传下载
  • 代理支持:HTTP/HTTPS代理
  • 认证支持:Basic Auth、Bearer Token等
  • 超时控制:精确的超时管理
  • 重试机制:自动重试失败请求

1.3 安装和配置

# 通过opm安装
opm get ledgetech/lua-resty-http

# 或者手动下载
wget https://github.com/ledgetech/lua-resty-http/archive/master.zip
# nginx.conf配置
http {
    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    
    # DNS解析器配置
    resolver 8.8.8.8 8.8.4.4 valid=300s;
    resolver_timeout 5s;
}

2. 基本用法

2.1 简单GET请求

local http = require "resty.http"
local httpc = http.new()

-- 基本GET请求
local res, err = httpc:request_uri("https://api.example.com/users")
if not res then
    ngx.log(ngx.ERR, "request failed: ", err)
    return
end

ngx.say("Status: ", res.status)
ngx.say("Body: ", res.body)

-- 关闭连接
httpc:close()

2.2 带参数的GET请求

local http = require "resty.http"
local httpc = http.new()

-- 构建查询参数
local args = {
    page = 1,
    limit = 10,
    sort = "created_at"
}

-- 方法1:手动构建URL
local query_string = ngx.encode_args(args)
local url = "https://api.example.com/users?" .. query_string

local res, err = httpc:request_uri(url)
if res then
    ngx.say(res.body)
end

-- 方法2:使用query参数
local res, err = httpc:request_uri("https://api.example.com/users", {
    method = "GET",
    query = args
})

httpc:close()

2.3 POST请求

local http = require "resty.http"
local cjson = require "cjson"
local httpc = http.new()

-- JSON数据
local data = {
    name = "John Doe",
    email = "john@example.com",
    age = 30
}

local res, err = httpc:request_uri("https://api.example.com/users", {
    method = "POST",
    body = cjson.encode(data),
    headers = {
        ["Content-Type"] = "application/json",
        ["Authorization"] = "Bearer your-token-here"
    }
})

if not res then
    ngx.log(ngx.ERR, "request failed: ", err)
    return
end

if res.status == 201 then
    ngx.say("User created successfully")
    ngx.say(res.body)
else
    ngx.say("Failed to create user: ", res.status)
end

httpc:close()

2.4 表单数据提交

local http = require "resty.http"
local httpc = http.new()

-- 表单数据
local form_data = {
    username = "john",
    password = "secret123",
    remember = "1"
}

local res, err = httpc:request_uri("https://api.example.com/login", {
    method = "POST",
    body = ngx.encode_args(form_data),
    headers = {
        ["Content-Type"] = "application/x-www-form-urlencoded"
    }
})

if res and res.status == 200 then
    -- 处理登录响应
    local response_data = require("cjson").decode(res.body)
    ngx.say("Login successful, token: ", response_data.token)
end

httpc:close()

3. 高级功能

3.1 连接池管理

local http = require "resty.http"

-- 创建HTTP客户端
local httpc = http.new()

-- 连接到服务器
local ok, err = httpc:connect("api.example.com", 443, {
    scheme = "https",
    ssl_verify = true,
    pool = "api_pool",  -- 连接池名称
    pool_size = 100,    -- 连接池大小
    backlog = 50        -- 等待队列大小
})

if not ok then
    ngx.log(ngx.ERR, "failed to connect: ", err)
    return
end

-- 发送请求
local res, err = httpc:request({
    method = "GET",
    path = "/api/users",
    headers = {
        ["Host"] = "api.example.com",
        ["User-Agent"] = "OpenResty/1.21"
    }
})

if res then
    -- 读取响应体
    local body, err = res:read_body()
    if body then
        ngx.say(body)
    end
end

-- 保持连接到池中
local ok, err = httpc:set_keepalive(30000, 100)  -- 30秒超时,最多100个连接
if not ok then
    ngx.log(ngx.ERR, "failed to set keepalive: ", err)
end

3.2 超时控制

local http = require "resty.http"
local httpc = http.new()

-- 设置超时时间
httpc:set_timeout(5000)  -- 5秒总超时

local res, err = httpc:request_uri("https://slow-api.example.com/data", {
    method = "GET",
    headers = {
        ["Accept"] = "application/json"
    }
})

if not res then
    if err == "timeout" then
        ngx.log(ngx.WARN, "Request timed out")
        ngx.status = 504
        ngx.say("Gateway Timeout")
    else
        ngx.log(ngx.ERR, "Request failed: ", err)
        ngx.status = 502
        ngx.say("Bad Gateway")
    end
    return
end

ngx.say(res.body)
httpc:close()

3.3 重试机制

local http = require "resty.http"

local function make_request_with_retry(url, options, max_retries)
    max_retries = max_retries or 3
    local last_err
    
    for i = 1, max_retries do
        local httpc = http.new()
        httpc:set_timeout(5000)
        
        local res, err = httpc:request_uri(url, options)
        
        if res then
            httpc:close()
            return res, nil
        end
        
        last_err = err
        httpc:close()
        
        -- 指数退避
        if i < max_retries then
            local delay = math.min(1000 * (2 ^ (i - 1)), 10000)  -- 最大10秒
            ngx.sleep(delay / 1000)
        end
        
        ngx.log(ngx.WARN, "Request attempt ", i, " failed: ", err)
    end
    
    return nil, "Max retries exceeded. Last error: " .. (last_err or "unknown")
end

-- 使用重试机制
local res, err = make_request_with_retry("https://unreliable-api.example.com/data", {
    method = "GET",
    headers = {
        ["Authorization"] = "Bearer token"
    }
})

if res then
    ngx.say("Success: ", res.body)
else
    ngx.log(ngx.ERR, "All retries failed: ", err)
    ngx.status = 502
    ngx.say("Service temporarily unavailable")
end

3.4 流式处理

local http = require "resty.http"
local httpc = http.new()

-- 连接到服务器
local ok, err = httpc:connect("api.example.com", 443, {
    scheme = "https"
})

if not ok then
    ngx.log(ngx.ERR, "failed to connect: ", err)
    return
end

-- 发送请求
local res, err = httpc:request({
    method = "GET",
    path = "/api/large-file",
    headers = {
        ["Host"] = "api.example.com"
    }
})

if not res then
    ngx.log(ngx.ERR, "request failed: ", err)
    return
end

-- 流式读取响应
local total_size = 0
while true do
    local chunk, err = res:read_body(8192)  -- 每次读取8KB
    if err then
        ngx.log(ngx.ERR, "read body failed: ", err)
        break
    end
    
    if not chunk then
        break  -- 读取完成
    end
    
    total_size = total_size + #chunk
    ngx.print(chunk)  -- 流式输出
    ngx.flush(true)   -- 立即刷新
end

ngx.log(ngx.INFO, "Total bytes read: ", total_size)
httpc:close()

4. 认证和安全

4.1 Basic认证

local http = require "resty.http"
local httpc = http.new()

-- 方法1:手动构建Authorization头
local username = "admin"
local password = "secret"
local auth_string = ngx.encode_base64(username .. ":" .. password)

local res, err = httpc:request_uri("https://api.example.com/protected", {
    method = "GET",
    headers = {
        ["Authorization"] = "Basic " .. auth_string
    }
})

-- 方法2:使用auth选项
local res, err = httpc:request_uri("https://api.example.com/protected", {
    method = "GET",
    auth = {
        username = username,
        password = password
    }
})

if res and res.status == 200 then
    ngx.say("Authentication successful")
    ngx.say(res.body)
else
    ngx.say("Authentication failed")
end

httpc:close()

4.2 Bearer Token认证

local http = require "resty.http"
local cjson = require "cjson"

-- 获取访问令牌
local function get_access_token()
    local httpc = http.new()
    
    local res, err = httpc:request_uri("https://auth.example.com/oauth/token", {
        method = "POST",
        body = ngx.encode_args({
            grant_type = "client_credentials",
            client_id = "your_client_id",
            client_secret = "your_client_secret",
            scope = "api:read api:write"
        }),
        headers = {
            ["Content-Type"] = "application/x-www-form-urlencoded"
        }
    })
    
    httpc:close()
    
    if res and res.status == 200 then
        local token_data = cjson.decode(res.body)
        return token_data.access_token
    end
    
    return nil
end

-- 使用令牌调用API
local function call_api_with_token(endpoint, token)
    local httpc = http.new()
    
    local res, err = httpc:request_uri("https://api.example.com" .. endpoint, {
        method = "GET",
        headers = {
            ["Authorization"] = "Bearer " .. token,
            ["Accept"] = "application/json"
        }
    })
    
    httpc:close()
    return res, err
end

-- 主逻辑
local token = get_access_token()
if not token then
    ngx.status = 401
    ngx.say("Failed to obtain access token")
    return
end

local res, err = call_api_with_token("/api/users", token)
if res then
    ngx.header.content_type = "application/json"
    ngx.say(res.body)
else
    ngx.status = 502
    ngx.say("API call failed: ", err)
end

4.3 SSL/TLS配置

local http = require "resty.http"
local httpc = http.new()

-- SSL选项配置
local ssl_opts = {
    verify = true,                    -- 验证服务器证书
    server_name = "api.example.com", -- SNI服务器名
    depth = 2,                       -- 证书链验证深度
    cert = "/path/to/client.crt",    -- 客户端证书
    key = "/path/to/client.key"      -- 客户端私钥
}

local ok, err = httpc:connect("api.example.com", 443, {
    scheme = "https",
    ssl_opts = ssl_opts
})

if not ok then
    ngx.log(ngx.ERR, "SSL connection failed: ", err)
    return
end

-- 发送请求
local res, err = httpc:request({
    method = "GET",
    path = "/api/secure-data",
    headers = {
        ["Host"] = "api.example.com"
    }
})

if res then
    local body = res:read_body()
    ngx.say(body)
end

httpc:close()

5. 错误处理和监控

5.1 完整的错误处理

local http = require "resty.http"
local cjson = require "cjson"

local function handle_api_error(res, err, context)
    if not res then
        -- 网络错误
        ngx.log(ngx.ERR, "Network error in ", context, ": ", err)
        return {
            success = false,
            error = "network_error",
            message = "Network connection failed"
        }
    end
    
    if res.status >= 500 then
        -- 服务器错误
        ngx.log(ngx.ERR, "Server error in ", context, ": ", res.status)
        return {
            success = false,
            error = "server_error",
            message = "Internal server error",
            status = res.status
        }
    elseif res.status >= 400 then
        -- 客户端错误
        local error_body = res.body
        local error_data
        
        -- 尝试解析错误响应
        local ok, parsed = pcall(cjson.decode, error_body)
        if ok then
            error_data = parsed
        else
            error_data = {message = error_body}
        end
        
        ngx.log(ngx.WARN, "Client error in ", context, ": ", res.status, " ", error_body)
        return {
            success = false,
            error = "client_error",
            message = error_data.message or "Bad request",
            status = res.status,
            details = error_data
        }
    end
    
    -- 成功响应
    return {
        success = true,
        data = res.body,
        status = res.status
    }
end

-- 使用错误处理
local function get_user_data(user_id)
    local httpc = http.new()
    httpc:set_timeout(5000)
    
    local res, err = httpc:request_uri("https://api.example.com/users/" .. user_id, {
        method = "GET",
        headers = {
            ["Authorization"] = "Bearer " .. get_auth_token(),
            ["Accept"] = "application/json"
        }
    })
    
    httpc:close()
    
    return handle_api_error(res, err, "get_user_data")
end

-- API调用
local result = get_user_data(123)
if result.success then
    local user_data = cjson.decode(result.data)
    ngx.header.content_type = "application/json"
    ngx.say(cjson.encode({
        success = true,
        user = user_data
    }))
else
    ngx.status = result.status or 500
    ngx.header.content_type = "application/json"
    ngx.say(cjson.encode({
        success = false,
        error = result.error,
        message = result.message
    }))
end

5.2 性能监控

local http = require "resty.http"
local cjson = require "cjson"

-- 性能监控装饰器
local function monitor_api_call(name, func, ...)
    local start_time = ngx.now()
    local cache = ngx.shared.my_stats
    
    -- 增加调用计数
    local count_key = "api_calls:" .. name
    cache:incr(count_key, 1, 0, 86400)  -- 24小时过期
    
    -- 执行函数
    local result = func(...)
    
    -- 记录执行时间
    local duration = ngx.now() - start_time
    local time_key = "api_time:" .. name
    local total_time = cache:get(time_key) or 0
    cache:set(time_key, total_time + duration, 86400)
    
    -- 记录错误
    if not result.success then
        local error_key = "api_errors:" .. name
        cache:incr(error_key, 1, 0, 86400)
    end
    
    -- 记录慢查询
    if duration > 1.0 then  -- 超过1秒
        ngx.log(ngx.WARN, "Slow API call: ", name, " took ", duration, "s")
        local slow_key = "api_slow:" .. name
        cache:incr(slow_key, 1, 0, 86400)
    end
    
    return result
end

-- 监控的API调用函数
local function monitored_get_user(user_id)
    return monitor_api_call("get_user", function(uid)
        local httpc = http.new()
        local res, err = httpc:request_uri("https://api.example.com/users/" .. uid)
        httpc:close()
        
        if res and res.status == 200 then
            return {success = true, data = res.body}
        else
            return {success = false, error = err or "HTTP " .. (res and res.status or "error")}
        end
    end, user_id)
end

-- 获取监控统计
local function get_api_stats()
    local cache = ngx.shared.my_stats
    local stats = {}
    
    local keys = cache:get_keys(0)
    for _, key in ipairs(keys) do
        local value = cache:get(key)
        stats[key] = value
    end
    
    return stats
end

-- 统计接口
location /api-stats {
    content_by_lua_block {
        local stats = get_api_stats()
        ngx.header.content_type = "application/json"
        ngx.say(require("cjson").encode(stats))
    }
}

6. 实际应用场景

6.1 API网关代理

-- API网关实现
local http = require "resty.http"
local cjson = require "cjson"

-- 服务发现
local services = {
    user = {"http://user-service-1:8080", "http://user-service-2:8080"},
    order = {"http://order-service-1:8080", "http://order-service-2:8080"},
    payment = {"http://payment-service:8080"}
}

-- 负载均衡
local function get_backend_url(service_name)
    local backends = services[service_name]
    if not backends then
        return nil, "Service not found"
    end
    
    -- 简单轮询
    local index = (ngx.time() % #backends) + 1
    return backends[index]
end

-- 代理请求
local function proxy_request(service_name, path)
    local backend_url, err = get_backend_url(service_name)
    if not backend_url then
        return nil, err
    end
    
    local httpc = http.new()
    httpc:set_timeout(10000)
    
    -- 转发请求
    local res, err = httpc:request_uri(backend_url .. path, {
        method = ngx.var.request_method,
        body = ngx.var.request_body,
        headers = ngx.req.get_headers(),
        query = ngx.req.get_uri_args()
    })
    
    httpc:close()
    return res, err
end

-- 路由处理
local uri = ngx.var.uri
local service_name, path = string.match(uri, "^/api/([^/]+)(.*)$")

if not service_name then
    ngx.status = 404
    ngx.say("Invalid API path")
    return
end

local res, err = proxy_request(service_name, path)
if not res then
    ngx.status = 502
    ngx.say("Backend service unavailable: ", err)
    return
end

-- 返回响应
ngx.status = res.status
for name, value in pairs(res.headers) do
    ngx.header[name] = value
end
ngx.say(res.body)

6.2 数据聚合服务

-- 数据聚合示例
local http = require "resty.http"
local cjson = require "cjson"

-- 并发请求函数
local function concurrent_requests(requests)
    local threads = {}
    
    -- 启动所有请求
    for i, req in ipairs(requests) do
        local thread = ngx.thread.spawn(function()
            local httpc = http.new()
            httpc:set_timeout(5000)
            
            local res, err = httpc:request_uri(req.url, req.options or {})
            httpc:close()
            
            return {
                name = req.name,
                success = res ~= nil,
                data = res and res.body or nil,
                error = err,
                status = res and res.status or nil
            }
        end)
        threads[i] = thread
    end
    
    -- 等待所有请求完成
    local results = {}
    for i, thread in ipairs(threads) do
        local ok, result = ngx.thread.wait(thread)
        if ok then
            results[result.name] = result
        else
            results["request_" .. i] = {
                success = false,
                error = "Thread execution failed"
            }
        end
    end
    
    return results
end

-- 用户详情聚合
local function get_user_details(user_id)
    local requests = {
        {
            name = "profile",
            url = "https://user-api.example.com/users/" .. user_id,
            options = {
                headers = {["Authorization"] = "Bearer " .. get_auth_token()}
            }
        },
        {
            name = "orders",
            url = "https://order-api.example.com/users/" .. user_id .. "/orders",
            options = {
                headers = {["Authorization"] = "Bearer " .. get_auth_token()}
            }
        },
        {
            name = "preferences",
            url = "https://pref-api.example.com/users/" .. user_id,
            options = {
                headers = {["Authorization"] = "Bearer " .. get_auth_token()}
            }
        }
    }
    
    local results = concurrent_requests(requests)
    
    -- 组装响应
    local response = {
        user_id = user_id,
        timestamp = ngx.time()
    }
    
    -- 处理用户资料
    if results.profile and results.profile.success then
        local profile_data = cjson.decode(results.profile.data)
        response.profile = profile_data
    else
        response.profile = {error = "Failed to load profile"}
    end
    
    -- 处理订单数据
    if results.orders and results.orders.success then
        local orders_data = cjson.decode(results.orders.data)
        response.orders = orders_data
    else
        response.orders = {error = "Failed to load orders"}
    end
    
    -- 处理偏好设置
    if results.preferences and results.preferences.success then
        local pref_data = cjson.decode(results.preferences.data)
        response.preferences = pref_data
    else
        response.preferences = {error = "Failed to load preferences"}
    end
    
    return response
end

-- API端点
local user_id = ngx.var.arg_user_id
if not user_id then
    ngx.status = 400
    ngx.say("Missing user_id parameter")
    return
end

local user_details = get_user_details(user_id)
ngx.header.content_type = "application/json"
ngx.say(cjson.encode(user_details))

7. 性能优化

7.1 连接池优化

-- 全局连接池管理
local _M = {}

-- 连接池配置
local pool_configs = {
    api_service = {
        host = "api.example.com",
        port = 443,
        scheme = "https",
        pool_name = "api_pool",
        pool_size = 100,
        backlog = 50,
        keepalive_timeout = 60000,
        keepalive_pool = 50
    }
}

-- 获取连接
function _M.get_connection(service_name)
    local config = pool_configs[service_name]
    if not config then
        return nil, "Unknown service: " .. service_name
    end
    
    local httpc = require("resty.http").new()
    
    local ok, err = httpc:connect(config.host, config.port, {
        scheme = config.scheme,
        pool = config.pool_name,
        pool_size = config.pool_size,
        backlog = config.backlog
    })
    
    if not ok then
        return nil, "Connection failed: " .. err
    end
    
    return httpc
end

-- 释放连接
function _M.release_connection(httpc, service_name)
    local config = pool_configs[service_name]
    if not config then
        httpc:close()
        return
    end
    
    local ok, err = httpc:set_keepalive(
        config.keepalive_timeout,
        config.keepalive_pool
    )
    
    if not ok then
        ngx.log(ngx.WARN, "Failed to set keepalive: ", err)
        httpc:close()
    end
end

return _M

7.2 缓存策略

local http = require "resty.http"
local cjson = require "cjson"

-- 带缓存的API调用
local function cached_api_call(cache_key, url, options, ttl)
    ttl = ttl or 300  -- 默认5分钟缓存
    
    local cache = ngx.shared.my_cache
    
    -- 尝试从缓存获取
    local cached_data = cache:get(cache_key)
    if cached_data then
        ngx.log(ngx.INFO, "Cache hit for: ", cache_key)
        return cjson.decode(cached_data)
    end
    
    -- 缓存未命中,调用API
    ngx.log(ngx.INFO, "Cache miss for: ", cache_key)
    
    local httpc = http.new()
    httpc:set_timeout(5000)
    
    local res, err = httpc:request_uri(url, options)
    httpc:close()
    
    if not res or res.status ~= 200 then
        return nil, err or ("HTTP " .. res.status)
    end
    
    local data = cjson.decode(res.body)
    
    -- 存入缓存
    local ok, err = cache:set(cache_key, res.body, ttl)
    if not ok then
        ngx.log(ngx.WARN, "Failed to cache data: ", err)
    end
    
    return data
end

-- 使用示例
local user_id = ngx.var.arg_user_id
local cache_key = "user_profile:" .. user_id

local user_data, err = cached_api_call(
    cache_key,
    "https://api.example.com/users/" .. user_id,
    {
        headers = {["Authorization"] = "Bearer " .. get_auth_token()}
    },
    600  -- 10分钟缓存
)

if user_data then
    ngx.header.content_type = "application/json"
    ngx.say(cjson.encode(user_data))
else
    ngx.status = 500
    ngx.say("Failed to get user data: ", err)
end

8. 总结

lua-resty-http是OpenResty生态中不可或缺的HTTP客户端库,它提供了完整的HTTP/HTTPS客户端功能。通过合理使用连接池、缓存、错误处理和监控机制,我们可以构建高性能、高可用的API服务。掌握这些技能对于开发现代Web应用和微服务架构至关重要。

9. 参考资料