1. 测试框架概述
1.1 OpenResty测试生态
OpenResty提供了完整的测试框架生态系统,主要包括:
- Test::Nginx:专门为Nginx和OpenResty设计的测试框架
- lua-resty-test:Lua单元测试框架
- busted:BDD风格的Lua测试框架
- 性能测试工具:wrk、ab、JMeter等
1.2 测试类型
测试金字塔
┌─────────────────┐
│ 端到端测试 │ ← 少量,高价值
├─────────────────┤
│ 集成测试 │ ← 适量,关键路径
├─────────────────┤
│ 单元测试 │ ← 大量,快速反馈
└─────────────────┘
2. Test::Nginx框架
2.1 安装配置
# 安装Test::Nginx
cpan Test::Nginx
# 或使用包管理器
sudo apt-get install libtest-nginx-perl # Ubuntu/Debian
sudo yum install perl-Test-Nginx # CentOS/RHEL
2.2 基础测试结构
# t/001-basic.t - 基础测试文件
use Test::Nginx::Socket 'no_plan';
# 测试配置
run_tests();
__DATA__
=== TEST 1: 基本HTTP请求测试
--- config
location /hello {
content_by_lua_block {
ngx.say("Hello, World!")
}
}
--- request
GET /hello
--- response_body
Hello, World!
--- no_error_log
[error]
=== TEST 2: 带参数的请求测试
--- config
location /echo {
content_by_lua_block {
local args = ngx.req.get_uri_args()
ngx.say("Message: ", args.msg or "No message")
}
}
--- request
GET /echo?msg=test
--- response_body
Message: test
--- no_error_log
[error]
=== TEST 3: POST请求测试
--- config
location /post {
content_by_lua_block {
ngx.req.read_body()
local body = ngx.req.get_body_data()
ngx.say("Received: ", body or "No body")
}
}
--- request
POST /post
Hello from POST
--- response_body
Received: Hello from POST
--- no_error_log
[error]
2.3 高级测试功能
# t/002-advanced.t - 高级测试功能
use Test::Nginx::Socket 'no_plan';
# 全局配置
our $HttpConfig = <<'_EOC_';
lua_shared_dict test_cache 10m;
lua_package_path "/path/to/lua/?.lua;;";
_EOC_
run_tests();
__DATA__
=== TEST 1: 共享内存测试
--- http_config eval: $::HttpConfig
--- config
location /cache {
content_by_lua_block {
local cache = ngx.shared.test_cache
local key = ngx.var.arg_key
local value = ngx.var.arg_value
if value then
cache:set(key, value, 60)
ngx.say("Set: ", key, " = ", value)
else
local cached_value = cache:get(key)
ngx.say("Get: ", key, " = ", cached_value or "nil")
end
}
}
--- request eval
["GET /cache?key=test&value=hello",
"GET /cache?key=test"]
--- response_body eval
["Set: test = hello\n",
"Get: test = hello\n"]
--- no_error_log
[error]
=== TEST 2: 错误处理测试
--- config
location /error {
content_by_lua_block {
local action = ngx.var.arg_action
if action == "divide_by_zero" then
local result = 10 / 0
ngx.say("Result: ", result)
elseif action == "nil_access" then
local obj = nil
ngx.say("Value: ", obj.field)
else
ngx.say("No error")
end
}
}
--- request
GET /error?action=divide_by_zero
--- response_body
Result: inf
--- no_error_log
[error]
=== TEST 3: 超时测试
--- config
location /timeout {
content_by_lua_block {
ngx.sleep(0.1)
ngx.say("Completed")
}
}
--- request
GET /timeout
--- response_body
Completed
--- timeout: 1
--- no_error_log
[error]
=== TEST 4: 多阶段测试
--- config
location /multi_phase {
access_by_lua_block {
ngx.ctx.access_time = ngx.now()
}
content_by_lua_block {
local access_time = ngx.ctx.access_time
local content_time = ngx.now()
ngx.say("Access: ", access_time)
ngx.say("Content: ", content_time)
ngx.say("Diff: ", content_time - access_time)
}
}
--- request
GET /multi_phase
--- response_body_like
^Access: \d+\.\d+
Content: \d+\.\d+
Diff: \d+\.\d+
$
--- no_error_log
[error]
2.4 数据库测试
# t/003-database.t - 数据库测试
use Test::Nginx::Socket 'no_plan';
our $HttpConfig = <<'_EOC_';
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
init_by_lua_block {
-- 初始化测试数据库连接
require "resty.mysql"
}
_EOC_
run_tests();
__DATA__
=== TEST 1: MySQL连接测试
--- http_config eval: $::HttpConfig
--- config
location /db_test {
content_by_lua_block {
local mysql = require "resty.mysql"
local db, err = mysql:new()
if not db then
ngx.say("Failed to create mysql object: ", err)
return
end
db:set_timeout(1000)
local ok, err = db:connect({
host = "127.0.0.1",
port = 3306,
database = "test",
user = "test",
password = "test",
charset = "utf8",
max_packet_size = 1024 * 1024
})
if not ok then
ngx.say("Failed to connect: ", err)
return
end
-- 测试查询
local res, err = db:query("SELECT 1 as test_value")
if not res then
ngx.say("Query failed: ", err)
return
end
ngx.say("Test value: ", res[1].test_value)
db:set_keepalive(10000, 100)
}
}
--- request
GET /db_test
--- response_body
Test value: 1
--- no_error_log
[error]
3. Lua单元测试
3.1 lua-resty-test框架
-- lib/utils.lua - 被测试的工具模块
local utils = {}
-- 字符串工具函数
function utils.trim(str)
if not str then return nil end
return str:match("^%s*(.-)%s*$")
end
function utils.split(str, delimiter)
if not str then return {} end
local result = {}
local pattern = "([^" .. delimiter .. "]+)"
for match in str:gmatch(pattern) do
table.insert(result, match)
end
return result
end
-- 数学工具函数
function utils.round(num, decimals)
local mult = 10 ^ (decimals or 0)
return math.floor(num * mult + 0.5) / mult
end
function utils.clamp(value, min_val, max_val)
return math.max(min_val, math.min(max_val, value))
end
-- 表工具函数
function utils.table_contains(tbl, value)
for _, v in ipairs(tbl) do
if v == value then
return true
end
end
return false
end
function utils.table_merge(t1, t2)
local result = {}
for k, v in pairs(t1) do
result[k] = v
end
for k, v in pairs(t2) do
result[k] = v
end
return result
end
return utils
-- t/unit/test_utils.lua - 单元测试文件
local test = require "resty.test"
local utils = require "utils"
-- 字符串函数测试
test.describe("String utilities", function()
test.it("should trim whitespace", function()
test.assert.equal(utils.trim(" hello "), "hello")
test.assert.equal(utils.trim("\n\tworld\r\n"), "world")
test.assert.equal(utils.trim(""), "")
test.assert.equal(utils.trim(nil), nil)
end)
test.it("should split strings", function()
local result = utils.split("a,b,c", ",")
test.assert.equal(#result, 3)
test.assert.equal(result[1], "a")
test.assert.equal(result[2], "b")
test.assert.equal(result[3], "c")
local empty_result = utils.split(nil, ",")
test.assert.equal(#empty_result, 0)
end)
end)
-- 数学函数测试
test.describe("Math utilities", function()
test.it("should round numbers correctly", function()
test.assert.equal(utils.round(3.14159, 2), 3.14)
test.assert.equal(utils.round(3.14159, 0), 3)
test.assert.equal(utils.round(3.6), 4)
end)
test.it("should clamp values", function()
test.assert.equal(utils.clamp(5, 1, 10), 5)
test.assert.equal(utils.clamp(-5, 1, 10), 1)
test.assert.equal(utils.clamp(15, 1, 10), 10)
end)
end)
-- 表函数测试
test.describe("Table utilities", function()
test.it("should check if table contains value", function()
local tbl = {"a", "b", "c"}
test.assert.is_true(utils.table_contains(tbl, "b"))
test.assert.is_false(utils.table_contains(tbl, "d"))
end)
test.it("should merge tables", function()
local t1 = {a = 1, b = 2}
local t2 = {c = 3, d = 4}
local result = utils.table_merge(t1, t2)
test.assert.equal(result.a, 1)
test.assert.equal(result.b, 2)
test.assert.equal(result.c, 3)
test.assert.equal(result.d, 4)
end)
end)
-- 运行测试
test.run()
3.2 Busted测试框架
-- spec/utils_spec.lua - Busted测试文件
local utils = require "utils"
describe("Utils module", function()
describe("String functions", function()
it("should trim whitespace from strings", function()
assert.are.equal("hello", utils.trim(" hello "))
assert.are.equal("world", utils.trim("\n\tworld\r\n"))
assert.are.equal("", utils.trim(""))
assert.is_nil(utils.trim(nil))
end)
it("should split strings by delimiter", function()
local result = utils.split("a,b,c", ",")
assert.are.equal(3, #result)
assert.are.equal("a", result[1])
assert.are.equal("b", result[2])
assert.are.equal("c", result[3])
end)
it("should handle empty strings in split", function()
local result = utils.split("", ",")
assert.are.equal(0, #result)
local nil_result = utils.split(nil, ",")
assert.are.equal(0, #nil_result)
end)
end)
describe("Math functions", function()
it("should round numbers to specified decimals", function()
assert.are.equal(3.14, utils.round(3.14159, 2))
assert.are.equal(3, utils.round(3.14159, 0))
assert.are.equal(4, utils.round(3.6))
end)
it("should clamp values within range", function()
assert.are.equal(5, utils.clamp(5, 1, 10))
assert.are.equal(1, utils.clamp(-5, 1, 10))
assert.are.equal(10, utils.clamp(15, 1, 10))
end)
end)
describe("Table functions", function()
local test_table
before_each(function()
test_table = {"apple", "banana", "cherry"}
end)
it("should find values in table", function()
assert.is_true(utils.table_contains(test_table, "banana"))
assert.is_false(utils.table_contains(test_table, "orange"))
end)
it("should merge two tables", function()
local t1 = {a = 1, b = 2}
local t2 = {c = 3, d = 4}
local merged = utils.table_merge(t1, t2)
assert.are.equal(1, merged.a)
assert.are.equal(2, merged.b)
assert.are.equal(3, merged.c)
assert.are.equal(4, merged.d)
end)
it("should handle overlapping keys in merge", function()
local t1 = {a = 1, b = 2}
local t2 = {b = 3, c = 4}
local merged = utils.table_merge(t1, t2)
-- t2的值应该覆盖t1的值
assert.are.equal(1, merged.a)
assert.are.equal(3, merged.b) -- 被t2覆盖
assert.are.equal(4, merged.c)
end)
end)
end)
4. 集成测试
4.1 API集成测试
# t/004-api-integration.t - API集成测试
use Test::Nginx::Socket 'no_plan';
use JSON;
our $HttpConfig = <<'_EOC_';
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
lua_shared_dict api_cache 10m;
lua_shared_dict api_stats 10m;
_EOC_
run_tests();
__DATA__
=== TEST 1: 用户注册API测试
--- http_config eval: $::HttpConfig
--- config
location /api/register {
content_by_lua_block {
local cjson = require "cjson"
-- 读取请求体
ngx.req.read_body()
local body = ngx.req.get_body_data()
if not body then
ngx.status = 400
ngx.say(cjson.encode({error = "Missing request body"}))
return
end
local data = cjson.decode(body)
-- 验证必需字段
if not data.username or not data.email or not data.password then
ngx.status = 400
ngx.say(cjson.encode({error = "Missing required fields"}))
return
end
-- 模拟用户注册逻辑
local user_id = "user_" .. ngx.time()
local response = {
success = true,
user_id = user_id,
username = data.username,
email = data.email,
created_at = ngx.time()
}
ngx.header.content_type = "application/json"
ngx.say(cjson.encode(response))
}
}
--- request
POST /api/register
{"username":"testuser","email":"test@example.com","password":"password123"}
--- response_body_like
{"success":true,"user_id":"user_\d+","username":"testuser","email":"test@example.com","created_at":\d+}
--- no_error_log
[error]
=== TEST 2: 用户登录API测试
--- http_config eval: $::HttpConfig
--- config
location /api/login {
content_by_lua_block {
local cjson = require "cjson"
ngx.req.read_body()
local body = ngx.req.get_body_data()
local data = cjson.decode(body)
-- 模拟登录验证
if data.username == "testuser" and data.password == "password123" then
local token = "token_" .. ngx.time() .. "_" .. math.random(1000, 9999)
local response = {
success = true,
token = token,
expires_in = 3600
}
ngx.header.content_type = "application/json"
ngx.say(cjson.encode(response))
else
ngx.status = 401
ngx.say(cjson.encode({error = "Invalid credentials"}))
end
}
}
--- request
POST /api/login
{"username":"testuser","password":"password123"}
--- response_body_like
{"success":true,"token":"token_\d+_\d+","expires_in":3600}
--- no_error_log
[error]
=== TEST 3: 受保护的API测试
--- http_config eval: $::HttpConfig
--- config
location /api/profile {
access_by_lua_block {
local auth_header = ngx.var.http_authorization
if not auth_header then
ngx.status = 401
ngx.say('{"error":"Missing authorization header"}')
ngx.exit(401)
end
local token = auth_header:match("Bearer%s+(.+)")
if not token or not token:match("^token_") then
ngx.status = 401
ngx.say('{"error":"Invalid token"}')
ngx.exit(401)
end
-- 将用户信息存储到上下文
ngx.ctx.user_id = "user_123"
ngx.ctx.username = "testuser"
}
content_by_lua_block {
local cjson = require "cjson"
local profile = {
user_id = ngx.ctx.user_id,
username = ngx.ctx.username,
email = "test@example.com",
created_at = ngx.time() - 86400 -- 1天前
}
ngx.header.content_type = "application/json"
ngx.say(cjson.encode(profile))
}
}
--- request
GET /api/profile
--- more_headers
Authorization: Bearer token_1234567890_5678
--- response_body
{"user_id":"user_123","username":"testuser","email":"test@example.com","created_at":1234567890}
--- no_error_log
[error]
4.2 数据库集成测试
# t/005-database-integration.t - 数据库集成测试
use Test::Nginx::Socket 'no_plan';
our $HttpConfig = <<'_EOC_';
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
init_worker_by_lua_block {
-- 初始化数据库连接池
local mysql = require "resty.mysql"
-- 创建测试表
local function setup_test_db()
local db = mysql:new()
db:set_timeout(1000)
local ok, err = db:connect({
host = "127.0.0.1",
port = 3306,
database = "test",
user = "test",
password = "test"
})
if ok then
-- 创建测试表
db:query([[
CREATE TABLE IF NOT EXISTS test_users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE,
email VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
]])
-- 清空测试数据
db:query("DELETE FROM test_users")
db:set_keepalive(10000, 100)
end
end
setup_test_db()
}
_EOC_
run_tests();
__DATA__
=== TEST 1: 创建用户测试
--- http_config eval: $::HttpConfig
--- config
location /db/create_user {
content_by_lua_block {
local mysql = require "resty.mysql"
local cjson = require "cjson"
ngx.req.read_body()
local body = ngx.req.get_body_data()
local data = cjson.decode(body)
local db = mysql:new()
db:set_timeout(1000)
local ok, err = db:connect({
host = "127.0.0.1",
port = 3306,
database = "test",
user = "test",
password = "test"
})
if not ok then
ngx.status = 500
ngx.say(cjson.encode({error = "Database connection failed"}))
return
end
-- 插入用户
local sql = "INSERT INTO test_users (username, email) VALUES (?, ?)"
local res, err = db:query(sql, data.username, data.email)
if not res then
ngx.status = 500
ngx.say(cjson.encode({error = "Insert failed: " .. err}))
else
ngx.say(cjson.encode({
success = true,
user_id = res.insert_id,
affected_rows = res.affected_rows
}))
end
db:set_keepalive(10000, 100)
}
}
--- request
POST /db/create_user
{"username":"testuser","email":"test@example.com"}
--- response_body_like
{"success":true,"user_id":\d+,"affected_rows":1}
--- no_error_log
[error]
=== TEST 2: 查询用户测试
--- http_config eval: $::HttpConfig
--- config
location /db/get_user {
content_by_lua_block {
local mysql = require "resty.mysql"
local cjson = require "cjson"
local username = ngx.var.arg_username
local db = mysql:new()
db:set_timeout(1000)
local ok, err = db:connect({
host = "127.0.0.1",
port = 3306,
database = "test",
user = "test",
password = "test"
})
if not ok then
ngx.status = 500
ngx.say(cjson.encode({error = "Database connection failed"}))
return
end
local sql = "SELECT * FROM test_users WHERE username = ?"
local res, err = db:query(sql, username)
if not res then
ngx.status = 500
ngx.say(cjson.encode({error = "Query failed: " .. err}))
elseif #res == 0 then
ngx.status = 404
ngx.say(cjson.encode({error = "User not found"}))
else
ngx.say(cjson.encode({
success = true,
user = res[1]
}))
end
db:set_keepalive(10000, 100)
}
}
--- request
GET /db/get_user?username=testuser
--- response_body_like
{"success":true,"user":{"id":\d+,"username":"testuser","email":"test@example.com","created_at":"[^"]+"}}
--- no_error_log
[error]
5. 性能测试
5.1 基准测试
#!/bin/bash
# scripts/benchmark.sh - 性能测试脚本
set -e
echo "Starting OpenResty performance tests..."
# 测试配置
TEST_URL="http://localhost:8080"
CONCURRENCY=100
DURATION=30
THREADS=4
# 创建测试结果目录
mkdir -p results
# 1. 基础HTTP性能测试
echo "Testing basic HTTP performance..."
wrk -t$THREADS -c$CONCURRENCY -d${DURATION}s $TEST_URL/hello > results/basic_http.txt
# 2. JSON API性能测试
echo "Testing JSON API performance..."
wrk -t$THREADS -c$CONCURRENCY -d${DURATION}s \
-H "Content-Type: application/json" \
-s scripts/post_json.lua \
$TEST_URL/api/echo > results/json_api.txt
# 3. 数据库查询性能测试
echo "Testing database query performance..."
wrk -t$THREADS -c$CONCURRENCY -d${DURATION}s \
$TEST_URL/db/users > results/database.txt
# 4. 缓存性能测试
echo "Testing cache performance..."
wrk -t$THREADS -c$CONCURRENCY -d${DURATION}s \
$TEST_URL/cache/get?key=test > results/cache.txt
# 5. WebSocket连接测试
echo "Testing WebSocket connections..."
node scripts/websocket_test.js > results/websocket.txt
echo "Performance tests completed. Results saved in results/ directory."
# 生成汇总报告
echo "Generating summary report..."
python3 scripts/generate_report.py results/
-- scripts/post_json.lua - wrk JSON POST脚本
wrk.method = "POST"
wrk.body = '{"message":"Hello, World!","timestamp":' .. os.time() .. '}'
wrk.headers["Content-Type"] = "application/json"
-- 请求计数器
local counter = 0
function request()
counter = counter + 1
local body = '{"message":"Test message ' .. counter .. '","timestamp":' .. os.time() .. '}'
return wrk.format("POST", "/api/echo", wrk.headers, body)
end
function response(status, headers, body)
if status ~= 200 then
print("Error: " .. status .. " - " .. body)
end
end
function done(summary, latency, requests)
print("Total requests: " .. summary.requests)
print("Total errors: " .. summary.errors.status)
print("Average latency: " .. latency.mean / 1000 .. "ms")
print("99th percentile: " .. latency:percentile(99) / 1000 .. "ms")
end
5.2 负载测试
# scripts/load_test.py - 负载测试脚本
import asyncio
import aiohttp
import time
import json
from concurrent.futures import ThreadPoolExecutor
import statistics
class LoadTester:
def __init__(self, base_url, max_concurrent=100):
self.base_url = base_url
self.max_concurrent = max_concurrent
self.results = []
async def make_request(self, session, endpoint, method='GET', data=None):
start_time = time.time()
try:
if method == 'GET':
async with session.get(f"{self.base_url}{endpoint}") as response:
await response.text()
status = response.status
elif method == 'POST':
async with session.post(f"{self.base_url}{endpoint}",
json=data) as response:
await response.text()
status = response.status
end_time = time.time()
latency = (end_time - start_time) * 1000 # ms
return {
'status': status,
'latency': latency,
'success': status == 200
}
except Exception as e:
end_time = time.time()
latency = (end_time - start_time) * 1000
return {
'status': 0,
'latency': latency,
'success': False,
'error': str(e)
}
async def run_load_test(self, endpoint, duration=60, method='GET', data=None):
print(f"Starting load test for {endpoint}...")
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(connector=connector,
timeout=timeout) as session:
start_time = time.time()
tasks = []
while time.time() - start_time < duration:
if len(tasks) < self.max_concurrent:
task = asyncio.create_task(
self.make_request(session, endpoint, method, data)
)
tasks.append(task)
# 收集完成的任务
done_tasks = [task for task in tasks if task.done()]
for task in done_tasks:
result = await task
self.results.append(result)
tasks.remove(task)
await asyncio.sleep(0.01) # 小延迟避免过度占用CPU
# 等待剩余任务完成
if tasks:
remaining_results = await asyncio.gather(*tasks)
self.results.extend(remaining_results)
return self.analyze_results()
def analyze_results(self):
if not self.results:
return {}
successful_requests = [r for r in self.results if r['success']]
failed_requests = [r for r in self.results if not r['success']]
latencies = [r['latency'] for r in successful_requests]
analysis = {
'total_requests': len(self.results),
'successful_requests': len(successful_requests),
'failed_requests': len(failed_requests),
'success_rate': len(successful_requests) / len(self.results) * 100,
'average_latency': statistics.mean(latencies) if latencies else 0,
'median_latency': statistics.median(latencies) if latencies else 0,
'p95_latency': statistics.quantiles(latencies, n=20)[18] if len(latencies) > 20 else 0,
'p99_latency': statistics.quantiles(latencies, n=100)[98] if len(latencies) > 100 else 0,
'min_latency': min(latencies) if latencies else 0,
'max_latency': max(latencies) if latencies else 0
}
return analysis
async def main():
tester = LoadTester("http://localhost:8080", max_concurrent=50)
# 测试不同的端点
tests = [
{
'name': 'Basic HTTP',
'endpoint': '/hello',
'method': 'GET',
'duration': 30
},
{
'name': 'JSON API',
'endpoint': '/api/echo',
'method': 'POST',
'data': {'message': 'test', 'timestamp': time.time()},
'duration': 30
},
{
'name': 'Database Query',
'endpoint': '/db/users',
'method': 'GET',
'duration': 30
}
]
results = {}
for test in tests:
print(f"\nRunning {test['name']} test...")
tester.results = [] # 重置结果
result = await tester.run_load_test(
test['endpoint'],
duration=test['duration'],
method=test['method'],
data=test.get('data')
)
results[test['name']] = result
print(f"Results for {test['name']}:")
print(f" Total requests: {result['total_requests']}")
print(f" Success rate: {result['success_rate']:.2f}%")
print(f" Average latency: {result['average_latency']:.2f}ms")
print(f" P95 latency: {result['p95_latency']:.2f}ms")
print(f" P99 latency: {result['p99_latency']:.2f}ms")
# 保存结果到文件
with open('load_test_results.json', 'w') as f:
json.dump(results, f, indent=2)
print("\nLoad test completed. Results saved to load_test_results.json")
if __name__ == "__main__":
asyncio.run(main())
6. 调试技术
6.1 日志调试
-- lib/debug_utils.lua - 调试工具模块
local debug_utils = {}
local cjson = require "cjson"
-- 调试级别
local DEBUG_LEVELS = {
ERROR = 1,
WARN = 2,
INFO = 3,
DEBUG = 4,
TRACE = 5
}
-- 当前调试级别(可通过环境变量设置)
local current_level = DEBUG_LEVELS[os.getenv("DEBUG_LEVEL") or "INFO"]
-- 格式化调试信息
function debug_utils.format_debug_info(level, message, context)
local timestamp = ngx.localtime()
local worker_id = ngx.worker.id() or 0
local request_id = ngx.ctx.request_id or "unknown"
local debug_info = {
timestamp = timestamp,
level = level,
worker_id = worker_id,
request_id = request_id,
message = message,
context = context or {}
}
return cjson.encode(debug_info)
end
-- 记录调试信息
function debug_utils.log(level, message, context)
local level_num = DEBUG_LEVELS[level] or DEBUG_LEVELS.INFO
if level_num <= current_level then
local formatted = debug_utils.format_debug_info(level, message, context)
if level == "ERROR" then
ngx.log(ngx.ERR, formatted)
elseif level == "WARN" then
ngx.log(ngx.WARN, formatted)
else
ngx.log(ngx.INFO, formatted)
end
}
end
-- 便捷方法
function debug_utils.error(message, context)
debug_utils.log("ERROR", message, context)
end
function debug_utils.warn(message, context)
debug_utils.log("WARN", message, context)
end
function debug_utils.info(message, context)
debug_utils.log("INFO", message, context)
end
function debug_utils.debug(message, context)
debug_utils.log("DEBUG", message, context)
end
function debug_utils.trace(message, context)
debug_utils.log("TRACE", message, context)
end
-- 性能计时器
local timers = {}
function debug_utils.start_timer(name)
timers[name] = ngx.now()
end
function debug_utils.end_timer(name)
local start_time = timers[name]
if start_time then
local duration = ngx.now() - start_time
debug_utils.debug("Timer: " .. name, {
duration_ms = duration * 1000
})
timers[name] = nil
return duration
end
return nil
end
-- 内存使用监控
function debug_utils.log_memory_usage(context)
local memory_kb = collectgarbage("count")
debug_utils.debug("Memory usage", {
memory_kb = memory_kb,
memory_mb = memory_kb / 1024,
context = context
})
end
-- 请求跟踪
function debug_utils.trace_request(phase)
local request_info = {
phase = phase,
method = ngx.var.request_method,
uri = ngx.var.request_uri,
remote_addr = ngx.var.remote_addr,
user_agent = ngx.var.http_user_agent,
request_time = ngx.var.request_time
}
debug_utils.trace("Request trace", request_info)
end
-- 变量转储
function debug_utils.dump_table(tbl, name)
local function serialize(obj, depth)
depth = depth or 0
if depth > 5 then return "[max depth reached]" end
if type(obj) == "table" then
local result = {}
for k, v in pairs(obj) do
result[tostring(k)] = serialize(v, depth + 1)
end
return result
else
return tostring(obj)
end
end
debug_utils.debug("Table dump: " .. (name or "unknown"), {
table_content = serialize(tbl)
})
end
return debug_utils
6.2 性能分析
-- lib/profiler.lua - 性能分析模块
local profiler = {}
local debug_utils = require "debug_utils"
-- 性能统计数据
local stats = {
function_calls = {},
execution_times = {},
memory_usage = {}
}
-- 函数调用包装器
function profiler.wrap_function(func, name)
return function(...)
local start_time = ngx.now()
local start_memory = collectgarbage("count")
-- 记录函数调用
stats.function_calls[name] = (stats.function_calls[name] or 0) + 1
-- 执行原函数
local results = {func(...)}
-- 记录执行时间
local execution_time = ngx.now() - start_time
if not stats.execution_times[name] then
stats.execution_times[name] = {}
end
table.insert(stats.execution_times[name], execution_time)
-- 记录内存使用
local memory_used = collectgarbage("count") - start_memory
if not stats.memory_usage[name] then
stats.memory_usage[name] = {}
end
table.insert(stats.memory_usage[name], memory_used)
return unpack(results)
end
end
-- 自动包装模块函数
function profiler.wrap_module(module, module_name)
local wrapped = {}
for name, func in pairs(module) do
if type(func) == "function" then
wrapped[name] = profiler.wrap_function(func, module_name .. "." .. name)
else
wrapped[name] = func
end
end
return wrapped
end
-- 获取性能统计
function profiler.get_stats()
local result = {
function_calls = stats.function_calls,
execution_times = {},
memory_usage = {}
}
-- 计算执行时间统计
for func_name, times in pairs(stats.execution_times) do
if #times > 0 then
local total = 0
local min_time = times[1]
local max_time = times[1]
for _, time in ipairs(times) do
total = total + time
min_time = math.min(min_time, time)
max_time = math.max(max_time, time)
end
result.execution_times[func_name] = {
count = #times,
total_ms = total * 1000,
average_ms = (total / #times) * 1000,
min_ms = min_time * 1000,
max_ms = max_time * 1000
}
end
end
-- 计算内存使用统计
for func_name, usage in pairs(stats.memory_usage) do
if #usage > 0 then
local total = 0
for _, mem in ipairs(usage) do
total = total + mem
end
result.memory_usage[func_name] = {
count = #usage,
total_kb = total,
average_kb = total / #usage
}
end
end
return result
end
-- 重置统计数据
function profiler.reset_stats()
stats.function_calls = {}
stats.execution_times = {}
stats.memory_usage = {}
end
-- 输出性能报告
function profiler.generate_report()
local stats_data = profiler.get_stats()
debug_utils.info("Performance Report", {
report_time = ngx.localtime(),
stats = stats_data
})
return stats_data
end
return profiler
6.3 错误处理与监控
-- lib/error_handler.lua - 错误处理模块
local error_handler = {}
local cjson = require "cjson"
local debug_utils = require "debug_utils"
-- 错误类型定义
local ERROR_TYPES = {
VALIDATION = "validation_error",
DATABASE = "database_error",
NETWORK = "network_error",
AUTHENTICATION = "auth_error",
AUTHORIZATION = "authz_error",
INTERNAL = "internal_error",
EXTERNAL = "external_error"
}
-- 错误统计
local error_stats = ngx.shared.error_stats or {}
-- 创建错误对象
function error_handler.create_error(error_type, message, details)
return {
type = error_type,
message = message,
details = details or {},
timestamp = ngx.time(),
request_id = ngx.ctx.request_id,
stack_trace = debug.traceback()
}
end
-- 处理错误
function error_handler.handle_error(error_obj, response_code)
response_code = response_code or 500
-- 记录错误日志
debug_utils.error("Error occurred", {
error_type = error_obj.type,
message = error_obj.message,
details = error_obj.details,
stack_trace = error_obj.stack_trace
})
-- 更新错误统计
error_handler.update_error_stats(error_obj.type)
-- 发送错误响应
ngx.status = response_code
ngx.header.content_type = "application/json"
local response = {
error = {
type = error_obj.type,
message = error_obj.message,
timestamp = error_obj.timestamp,
request_id = error_obj.request_id
}
}
-- 在开发环境中包含详细信息
if os.getenv("ENVIRONMENT") == "development" then
response.error.details = error_obj.details
response.error.stack_trace = error_obj.stack_trace
end
ngx.say(cjson.encode(response))
ngx.exit(response_code)
end
-- 更新错误统计
function error_handler.update_error_stats(error_type)
local cache = ngx.shared.error_stats
if cache then
local current_count = cache:get(error_type) or 0
cache:set(error_type, current_count + 1, 3600) -- 1小时过期
end
end
-- 获取错误统计
function error_handler.get_error_stats()
local cache = ngx.shared.error_stats
if not cache then
return {}
end
local stats = {}
for _, error_type in pairs(ERROR_TYPES) do
stats[error_type] = cache:get(error_type) or 0
end
return stats
end
-- 包装函数以捕获错误
function error_handler.wrap_with_error_handling(func, error_type)
return function(...)
local success, result = pcall(func, ...)
if not success then
local error_obj = error_handler.create_error(
error_type or ERROR_TYPES.INTERNAL,
"Function execution failed: " .. tostring(result),
{function_args = {...}}
)
error_handler.handle_error(error_obj)
end
return result
end
end
-- 验证错误
function error_handler.validation_error(message, field)
local error_obj = error_handler.create_error(
ERROR_TYPES.VALIDATION,
message,
{field = field}
)
error_handler.handle_error(error_obj, 400)
end
-- 数据库错误
function error_handler.database_error(message, query)
local error_obj = error_handler.create_error(
ERROR_TYPES.DATABASE,
message,
{query = query}
)
error_handler.handle_error(error_obj, 500)
end
-- 认证错误
function error_handler.auth_error(message)
local error_obj = error_handler.create_error(
ERROR_TYPES.AUTHENTICATION,
message
)
error_handler.handle_error(error_obj, 401)
end
-- 授权错误
function error_handler.authz_error(message, required_permission)
local error_obj = error_handler.create_error(
ERROR_TYPES.AUTHORIZATION,
message,
{required_permission = required_permission}
)
error_handler.handle_error(error_obj, 403)
end
return error_handler
7. 测试自动化
7.1 CI/CD集成
# .github/workflows/test.yml - GitHub Actions配置
name: OpenResty Tests
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
services:
mysql:
image: mysql:8.0
env:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: test
MYSQL_USER: test
MYSQL_PASSWORD: test
ports:
- 3306:3306
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
redis:
image: redis:6
ports:
- 6379:6379
options: --health-cmd="redis-cli ping" --health-interval=10s --health-timeout=5s --health-retries=3
steps:
- uses: actions/checkout@v3
- name: Install OpenResty
run: |
sudo apt-get update
sudo apt-get install -y software-properties-common
sudo add-apt-repository -y "deb http://openresty.org/package/ubuntu $(lsb_release -sc) main"
sudo apt-get update
sudo apt-get install -y openresty
- name: Install Test Dependencies
run: |
sudo apt-get install -y cpanminus
sudo cpanm Test::Nginx
sudo cpanm JSON
- name: Install Lua Dependencies
run: |
sudo /usr/local/openresty/luajit/bin/luarocks install busted
sudo /usr/local/openresty/luajit/bin/luarocks install lua-cjson
- name: Setup Test Environment
run: |
# 创建测试配置
sudo mkdir -p /etc/openresty
sudo cp test/nginx.conf /etc/openresty/
# 设置权限
sudo chown -R $USER:$USER /usr/local/openresty
# 初始化测试数据库
mysql -h 127.0.0.1 -u test -ptest test < test/schema.sql
- name: Run Unit Tests
run: |
cd test
/usr/local/openresty/luajit/bin/busted spec/
- name: Run Integration Tests
run: |
export PATH=/usr/local/openresty/bin:$PATH
prove -r t/
- name: Run Performance Tests
run: |
# 启动测试服务器
sudo /usr/local/openresty/bin/openresty -c /etc/openresty/nginx.conf
sleep 5
# 运行性能测试
./scripts/benchmark.sh
# 停止服务器
sudo /usr/local/openresty/bin/openresty -s stop
- name: Upload Test Results
uses: actions/upload-artifact@v3
if: always()
with:
name: test-results
path: |
results/
test-output/
7.2 测试覆盖率
-- lib/coverage.lua - 代码覆盖率模块
local coverage = {}
local cjson = require "cjson"
-- 覆盖率数据
local coverage_data = {}
local line_counts = {}
-- 启用覆盖率收集
function coverage.enable()
debug.sethook(coverage.line_hook, "l")
end
-- 禁用覆盖率收集
function coverage.disable()
debug.sethook()
end
-- 行执行钩子
function coverage.line_hook(event, line)
local info = debug.getinfo(2, "S")
if info and info.source then
local file = info.source:match("@(.+)$")
if file and file:match("\.lua$") then
if not coverage_data[file] then
coverage_data[file] = {}
end
coverage_data[file][line] = (coverage_data[file][line] or 0) + 1
end
}
end
-- 获取覆盖率数据
function coverage.get_data()
return coverage_data
end
-- 计算覆盖率统计
function coverage.calculate_stats()
local stats = {
files = {},
total_lines = 0,
covered_lines = 0,
coverage_percentage = 0
}
for file, lines in pairs(coverage_data) do
local file_stats = {
file = file,
total_lines = 0,
covered_lines = 0,
coverage_percentage = 0,
lines = lines
}
-- 读取文件获取总行数
local total_lines = coverage.count_file_lines(file)
file_stats.total_lines = total_lines
file_stats.covered_lines = coverage.count_covered_lines(lines)
if total_lines > 0 then
file_stats.coverage_percentage = (file_stats.covered_lines / total_lines) * 100
end
stats.files[file] = file_stats
stats.total_lines = stats.total_lines + total_lines
stats.covered_lines = stats.covered_lines + file_stats.covered_lines
end
if stats.total_lines > 0 then
stats.coverage_percentage = (stats.covered_lines / stats.total_lines) * 100
end
return stats
end
-- 计算文件行数
function coverage.count_file_lines(file)
local count = 0
local f = io.open(file, "r")
if f then
for _ in f:lines() do
count = count + 1
end
f:close()
end
return count
end
-- 计算已覆盖行数
function coverage.count_covered_lines(lines)
local count = 0
for _ in pairs(lines) do
count = count + 1
end
return count
end
-- 生成HTML覆盖率报告
function coverage.generate_html_report(output_file)
local stats = coverage.calculate_stats()
local html = [[
<!DOCTYPE html>
<html>
<head>
<title>Code Coverage Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.summary { background: #f5f5f5; padding: 15px; border-radius: 5px; }
.file { margin: 10px 0; padding: 10px; border: 1px solid #ddd; }
.covered { background-color: #d4edda; }
.uncovered { background-color: #f8d7da; }
.line-number { display: inline-block; width: 50px; text-align: right; }
</style>
</head>
<body>
<h1>Code Coverage Report</h1>
<div class="summary">
<h2>Summary</h2>
<p>Total Lines: ]] .. stats.total_lines .. [[</p>
<p>Covered Lines: ]] .. stats.covered_lines .. [[</p>
<p>Coverage: ]] .. string.format("%.2f", stats.coverage_percentage) .. [[%</p>
</div>
]]
for file, file_stats in pairs(stats.files) do
html = html .. [[
<div class="file">
<h3>]] .. file .. [[</h3>
<p>Coverage: ]] .. string.format("%.2f", file_stats.coverage_percentage) .. [[%</p>
<pre>
]]
-- 读取文件内容并标记覆盖率
local f = io.open(file, "r")
if f then
local line_num = 1
for line in f:lines() do
local is_covered = file_stats.lines[line_num] and file_stats.lines[line_num] > 0
local class = is_covered and "covered" or "uncovered"
html = html .. '<span class="' .. class .. '">' ..
'<span class="line-number">' .. line_num .. '</span> ' ..
line .. '</span>\n'
line_num = line_num + 1
end
f:close()
end
html = html .. [[
</pre>
</div>
]]
end
html = html .. [[
</body>
</html>
]]
-- 写入HTML文件
local output = io.open(output_file, "w")
if output then
output:write(html)
output:close()
end
end
-- 重置覆盖率数据
function coverage.reset()
coverage_data = {}
end
return coverage
7.3 测试报告生成
# scripts/generate_report.py - 测试报告生成脚本
import json
import os
import sys
from datetime import datetime
import argparse
class TestReportGenerator:
def __init__(self, results_dir):
self.results_dir = results_dir
self.report_data = {
'timestamp': datetime.now().isoformat(),
'summary': {},
'unit_tests': {},
'integration_tests': {},
'performance_tests': {},
'coverage': {}
}
def parse_test_results(self):
"""解析各种测试结果文件"""
# 解析单元测试结果
unit_test_file = os.path.join(self.results_dir, 'unit_tests.json')
if os.path.exists(unit_test_file):
with open(unit_test_file, 'r') as f:
self.report_data['unit_tests'] = json.load(f)
# 解析集成测试结果
integration_test_file = os.path.join(self.results_dir, 'integration_tests.json')
if os.path.exists(integration_test_file):
with open(integration_test_file, 'r') as f:
self.report_data['integration_tests'] = json.load(f)
# 解析性能测试结果
perf_test_file = os.path.join(self.results_dir, 'load_test_results.json')
if os.path.exists(perf_test_file):
with open(perf_test_file, 'r') as f:
self.report_data['performance_tests'] = json.load(f)
# 解析覆盖率结果
coverage_file = os.path.join(self.results_dir, 'coverage.json')
if os.path.exists(coverage_file):
with open(coverage_file, 'r') as f:
self.report_data['coverage'] = json.load(f)
def calculate_summary(self):
"""计算测试汇总信息"""
summary = {
'total_tests': 0,
'passed_tests': 0,
'failed_tests': 0,
'success_rate': 0,
'coverage_percentage': 0
}
# 统计单元测试
if 'unit_tests' in self.report_data:
unit_data = self.report_data['unit_tests']
summary['total_tests'] += unit_data.get('total', 0)
summary['passed_tests'] += unit_data.get('passed', 0)
summary['failed_tests'] += unit_data.get('failed', 0)
# 统计集成测试
if 'integration_tests' in self.report_data:
integration_data = self.report_data['integration_tests']
summary['total_tests'] += integration_data.get('total', 0)
summary['passed_tests'] += integration_data.get('passed', 0)
summary['failed_tests'] += integration_data.get('failed', 0)
# 计算成功率
if summary['total_tests'] > 0:
summary['success_rate'] = (summary['passed_tests'] / summary['total_tests']) * 100
# 获取覆盖率
if 'coverage' in self.report_data:
summary['coverage_percentage'] = self.report_data['coverage'].get('coverage_percentage', 0)
self.report_data['summary'] = summary
def generate_html_report(self, output_file):
"""生成HTML格式的测试报告"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>OpenResty Test Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background: #2c3e50; color: white; padding: 20px; border-radius: 5px; }
.summary { background: #ecf0f1; padding: 15px; margin: 20px 0; border-radius: 5px; }
.section { margin: 20px 0; padding: 15px; border: 1px solid #bdc3c7; border-radius: 5px; }
.success { color: #27ae60; }
.failure { color: #e74c3c; }
.warning { color: #f39c12; }
table { width: 100%; border-collapse: collapse; margin: 10px 0; }
th, td { padding: 10px; text-align: left; border-bottom: 1px solid #ddd; }
th { background-color: #f8f9fa; }
.metric { display: inline-block; margin: 10px 20px 10px 0; }
.metric-value { font-size: 24px; font-weight: bold; }
.metric-label { font-size: 14px; color: #666; }
</style>
</head>
<body>
<div class="header">
<h1>OpenResty Test Report</h1>
<p>Generated on: {timestamp}</p>
</div>
<div class="summary">
<h2>Test Summary</h2>
<div class="metric">
<div class="metric-value {total_class}">{total_tests}</div>
<div class="metric-label">Total Tests</div>
</div>
<div class="metric">
<div class="metric-value success">{passed_tests}</div>
<div class="metric-label">Passed</div>
</div>
<div class="metric">
<div class="metric-value failure">{failed_tests}</div>
<div class="metric-label">Failed</div>
</div>
<div class="metric">
<div class="metric-value {success_class}">{success_rate:.1f}%</div>
<div class="metric-label">Success Rate</div>
</div>
<div class="metric">
<div class="metric-value {coverage_class}">{coverage_percentage:.1f}%</div>
<div class="metric-label">Code Coverage</div>
</div>
</div>
{unit_tests_section}
{integration_tests_section}
{performance_tests_section}
{coverage_section}
</body>
</html>
"""
# 准备模板数据
summary = self.report_data['summary']
# 确定样式类
success_class = 'success' if summary['success_rate'] >= 90 else 'warning' if summary['success_rate'] >= 70 else 'failure'
coverage_class = 'success' if summary['coverage_percentage'] >= 80 else 'warning' if summary['coverage_percentage'] >= 60 else 'failure'
total_class = 'success' if summary['failed_tests'] == 0 else 'warning'
# 生成各个部分的HTML
unit_tests_section = self._generate_unit_tests_section()
integration_tests_section = self._generate_integration_tests_section()
performance_tests_section = self._generate_performance_tests_section()
coverage_section = self._generate_coverage_section()
# 填充模板
html_content = html_template.format(
timestamp=self.report_data['timestamp'],
total_tests=summary['total_tests'],
passed_tests=summary['passed_tests'],
failed_tests=summary['failed_tests'],
success_rate=summary['success_rate'],
coverage_percentage=summary['coverage_percentage'],
total_class=total_class,
success_class=success_class,
coverage_class=coverage_class,
unit_tests_section=unit_tests_section,
integration_tests_section=integration_tests_section,
performance_tests_section=performance_tests_section,
coverage_section=coverage_section
)
# 写入文件
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html_content)
def _generate_unit_tests_section(self):
if not self.report_data.get('unit_tests'):
return '<div class="section"><h3>Unit Tests</h3><p>No unit test results found.</p></div>'
data = self.report_data['unit_tests']
return f"""
<div class="section">
<h3>Unit Tests</h3>
<table>
<tr>
<th>Metric</th>
<th>Value</th>
</tr>
<tr>
<td>Total Tests</td>
<td>{data.get('total', 0)}</td>
</tr>
<tr>
<td>Passed</td>
<td class="success">{data.get('passed', 0)}</td>
</tr>
<tr>
<td>Failed</td>
<td class="failure">{data.get('failed', 0)}</td>
</tr>
<tr>
<td>Execution Time</td>
<td>{data.get('execution_time', 'N/A')}</td>
</tr>
</table>
</div>
"""
def _generate_integration_tests_section(self):
if not self.report_data.get('integration_tests'):
return '<div class="section"><h3>Integration Tests</h3><p>No integration test results found.</p></div>'
data = self.report_data['integration_tests']
return f"""
<div class="section">
<h3>Integration Tests</h3>
<table>
<tr>
<th>Test Suite</th>
<th>Total</th>
<th>Passed</th>
<th>Failed</th>
<th>Duration</th>
</tr>
<tr>
<td>API Tests</td>
<td>{data.get('api_tests', {}).get('total', 0)}</td>
<td class="success">{data.get('api_tests', {}).get('passed', 0)}</td>
<td class="failure">{data.get('api_tests', {}).get('failed', 0)}</td>
<td>{data.get('api_tests', {}).get('duration', 'N/A')}</td>
</tr>
<tr>
<td>Database Tests</td>
<td>{data.get('db_tests', {}).get('total', 0)}</td>
<td class="success">{data.get('db_tests', {}).get('passed', 0)}</td>
<td class="failure">{data.get('db_tests', {}).get('failed', 0)}</td>
<td>{data.get('db_tests', {}).get('duration', 'N/A')}</td>
</tr>
</table>
</div>
"""
def _generate_performance_tests_section(self):
if not self.report_data.get('performance_tests'):
return '<div class="section"><h3>Performance Tests</h3><p>No performance test results found.</p></div>'
data = self.report_data['performance_tests']
table_rows = ""
for test_name, test_data in data.items():
table_rows += f"""
<tr>
<td>{test_name}</td>
<td>{test_data.get('total_requests', 'N/A')}</td>
<td>{test_data.get('success_rate', 0):.1f}%</td>
<td>{test_data.get('average_latency', 0):.2f}ms</td>
<td>{test_data.get('p95_latency', 0):.2f}ms</td>
<td>{test_data.get('p99_latency', 0):.2f}ms</td>
</tr>
"""
return f"""
<div class="section">
<h3>Performance Tests</h3>
<table>
<tr>
<th>Test</th>
<th>Total Requests</th>
<th>Success Rate</th>
<th>Avg Latency</th>
<th>P95 Latency</th>
<th>P99 Latency</th>
</tr>
{table_rows}
</table>
</div>
"""
def _generate_coverage_section(self):
if not self.report_data.get('coverage'):
return '<div class="section"><h3>Code Coverage</h3><p>No coverage data found.</p></div>'
data = self.report_data['coverage']
file_rows = ""
if 'files' in data:
for file_path, file_data in data['files'].items():
coverage_pct = file_data.get('coverage_percentage', 0)
coverage_class = 'success' if coverage_pct >= 80 else 'warning' if coverage_pct >= 60 else 'failure'
file_rows += f"""
<tr>
<td>{file_path}</td>
<td>{file_data.get('total_lines', 0)}</td>
<td>{file_data.get('covered_lines', 0)}</td>
<td class="{coverage_class}">{coverage_pct:.1f}%</td>
</tr>
"""
return f"""
<div class="section">
<h3>Code Coverage</h3>
<p>Overall Coverage: <strong>{data.get('coverage_percentage', 0):.1f}%</strong></p>
<table>
<tr>
<th>File</th>
<th>Total Lines</th>
<th>Covered Lines</th>
<th>Coverage</th>
</tr>
{file_rows}
</table>
</div>
"""
def generate_json_report(self, output_file):
"""生成JSON格式的测试报告"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.report_data, f, indent=2, ensure_ascii=False)
def generate_reports(self, output_dir):
"""生成所有格式的报告"""
os.makedirs(output_dir, exist_ok=True)
# 解析测试结果
self.parse_test_results()
# 计算汇总信息
self.calculate_summary()
# 生成HTML报告
html_file = os.path.join(output_dir, 'test_report.html')
self.generate_html_report(html_file)
print(f"HTML report generated: {html_file}")
# 生成JSON报告
json_file = os.path.join(output_dir, 'test_report.json')
self.generate_json_report(json_file)
print(f"JSON report generated: {json_file}")
# 打印汇总信息
summary = self.report_data['summary']
print(f"\nTest Summary:")
print(f" Total Tests: {summary['total_tests']}")
print(f" Passed: {summary['passed_tests']}")
print(f" Failed: {summary['failed_tests']}")
print(f" Success Rate: {summary['success_rate']:.1f}%")
print(f" Code Coverage: {summary['coverage_percentage']:.1f}%")
return summary['failed_tests'] == 0 # 返回是否所有测试都通过
def main():
parser = argparse.ArgumentParser(description='Generate OpenResty test reports')
parser.add_argument('results_dir', help='Directory containing test results')
parser.add_argument('--output', '-o', default='reports', help='Output directory for reports')
args = parser.parse_args()
if not os.path.exists(args.results_dir):
print(f"Error: Results directory '{args.results_dir}' does not exist")
sys.exit(1)
generator = TestReportGenerator(args.results_dir)
success = generator.generate_reports(args.output)
# 如果有测试失败,退出码为1
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()
8. 最佳实践
8.1 测试策略
测试金字塔原则
- 大量单元测试(快速、隔离)
- 适量集成测试(关键路径)
- 少量端到端测试(用户场景)
测试驱动开发(TDD)
- 先写测试,再写实现
- 红-绿-重构循环
- 保持测试简单明确
持续集成
- 每次提交都运行测试
- 快速反馈机制
- 自动化部署流程
8.2 调试技巧
日志策略
- 结构化日志记录
- 适当的日志级别
- 请求跟踪和关联
性能分析
- 定期性能基准测试
- 内存使用监控
- 热点函数识别
错误处理
- 统一错误处理机制
- 详细错误信息记录
- 优雅降级策略
8.3 工具选择
测试框架
- Test::Nginx:专为OpenResty设计
- Busted:现代Lua测试框架
- 自定义测试工具:特定需求
性能测试
- wrk:HTTP基准测试
- JMeter:复杂场景测试
- 自定义负载测试:业务场景
监控工具
- Prometheus:指标收集
- Grafana:可视化展示
- ELK:日志分析
通过本章的学习,你应该掌握了OpenResty应用的完整测试和调试体系,包括单元测试、集成测试、性能测试、代码覆盖率分析、错误处理和监控等各个方面。这些技能对于构建高质量、可维护的OpenResty应用至关重要。