1. 微服务架构概述
1.1 微服务架构的特点
微服务架构是一种将单一应用程序开发为一组小型服务的方法: - 服务独立:每个服务可以独立开发、部署和扩展 - 技术多样性:不同服务可以使用不同的技术栈 - 数据隔离:每个服务拥有自己的数据存储 - 故障隔离:单个服务的故障不会影响整个系统 - 团队自治:小团队可以独立负责特定服务
1.2 OpenResty在微服务中的角色
┌─────────────────────────────────────────────────────────┐
│ 客户端应用 │
└─────────────────┬───────────────────────────────────────┘
│
┌─────────────────▼───────────────────────────────────────┐
│ OpenResty API网关 │
│ ┌─────────────┬─────────────┬─────────────┬─────────┐ │
│ │ 认证鉴权 │ 路由转发 │ 限流熔断 │ 监控 │ │
│ └─────────────┴─────────────┴─────────────┴─────────┘ │
└─────────┬───────────┬───────────┬───────────┬───────────┘
│ │ │ │
┌─────────▼─┐ ┌───────▼─┐ ┌───────▼─┐ ┌───────▼─┐
│ 用户服务 │ │ 订单服务 │ │ 支付服务 │ │ 通知服务 │
└───────────┘ └─────────┘ └─────────┘ └─────────┘
1.3 API网关的核心功能
- 请求路由:根据规则将请求转发到相应的微服务
- 协议转换:支持HTTP、WebSocket、gRPC等多种协议
- 认证授权:统一的身份验证和权限控制
- 限流熔断:保护后端服务免受过载
- 监控日志:统一的请求监控和日志记录
- 缓存:减少对后端服务的请求
- 数据转换:请求和响应的格式转换
2. API网关基础架构
2.1 网关核心配置
# nginx.conf - API网关配置
http {
# 日志格式
log_format gateway_log '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" '
'rt=$request_time uct="$upstream_connect_time" '
'uht="$upstream_header_time" urt="$upstream_response_time" '
'service="$upstream_addr" request_id="$request_id"';
# 共享内存配置
lua_shared_dict gateway_cache 100m;
lua_shared_dict rate_limit 10m;
lua_shared_dict circuit_breaker 10m;
lua_shared_dict service_discovery 50m;
# Lua包路径
lua_package_path "/usr/local/openresty/lualib/gateway/?.lua;;";
# 初始化
init_by_lua_block {
require "gateway.init"
}
init_worker_by_lua_block {
require "gateway.worker"
}
# 微服务后端定义
upstream user_service {
server 192.168.1.10:8001;
server 192.168.1.11:8001;
keepalive 32;
}
upstream order_service {
server 192.168.1.20:8002;
server 192.168.1.21:8002;
keepalive 32;
}
upstream payment_service {
server 192.168.1.30:8003;
server 192.168.1.31:8003;
keepalive 32;
}
# API网关服务器
server {
listen 80;
server_name api.example.com;
access_log /var/log/nginx/gateway.log gateway_log;
# 全局请求处理
access_by_lua_block {
local gateway = require "gateway.core"
gateway.access_handler()
}
# 用户服务路由
location ~* ^/api/v1/users(/.*)?$ {
set $service_name "user_service";
set $service_path $1;
rewrite_by_lua_block {
local gateway = require "gateway.core"
gateway.rewrite_handler()
}
proxy_pass http://user_service$service_path;
header_filter_by_lua_block {
local gateway = require "gateway.core"
gateway.header_filter_handler()
}
log_by_lua_block {
local gateway = require "gateway.core"
gateway.log_handler()
}
}
# 订单服务路由
location ~* ^/api/v1/orders(/.*)?$ {
set $service_name "order_service";
set $service_path $1;
rewrite_by_lua_block {
local gateway = require "gateway.core"
gateway.rewrite_handler()
}
proxy_pass http://order_service$service_path;
header_filter_by_lua_block {
local gateway = require "gateway.core"
gateway.header_filter_handler()
}
log_by_lua_block {
local gateway = require "gateway.core"
gateway.log_handler()
}
}
# 支付服务路由
location ~* ^/api/v1/payments(/.*)?$ {
set $service_name "payment_service";
set $service_path $1;
rewrite_by_lua_block {
local gateway = require "gateway.core"
gateway.rewrite_handler()
}
proxy_pass http://payment_service$service_path;
header_filter_by_lua_block {
local gateway = require "gateway.core"
gateway.header_filter_handler()
}
log_by_lua_block {
local gateway = require "gateway.core"
gateway.log_handler()
}
}
# 健康检查
location /health {
content_by_lua_block {
local gateway = require "gateway.health"
gateway.health_check()
}
}
# 管理接口
location /admin/ {
allow 192.168.1.0/24;
deny all;
content_by_lua_block {
local admin = require "gateway.admin"
admin.handle_request()
}
}
}
}
2.2 网关核心模块
-- gateway/core.lua - 网关核心模块
local core = {}
local cjson = require "cjson"
local auth = require "gateway.auth"
local router = require "gateway.router"
local rate_limiter = require "gateway.rate_limiter"
local circuit_breaker = require "gateway.circuit_breaker"
local transformer = require "gateway.transformer"
local monitor = require "gateway.monitor"
-- 请求ID生成
local function generate_request_id()
return string.format("%d-%d-%d",
ngx.time(), ngx.worker.pid(), math.random(10000, 99999))
end
-- 访问阶段处理
function core.access_handler()
-- 生成请求ID
local request_id = generate_request_id()
ngx.var.request_id = request_id
ngx.ctx.request_id = request_id
ngx.ctx.start_time = ngx.now()
-- 记录请求开始
monitor.record_request_start()
-- 认证鉴权
local auth_result = auth.authenticate()
if not auth_result.success then
ngx.status = auth_result.status or 401
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = auth_result.error or "Authentication failed",
request_id = request_id
}))
return ngx.exit(ngx.status)
end
ngx.ctx.user = auth_result.user
-- 路由解析
local route = router.resolve_route()
if not route then
ngx.status = 404
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = "Route not found",
request_id = request_id
}))
return ngx.exit(404)
end
ngx.ctx.route = route
-- 权限检查
if not auth.authorize(ngx.ctx.user, route) then
ngx.status = 403
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = "Access denied",
request_id = request_id
}))
return ngx.exit(403)
end
-- 限流检查
local rate_limit_result = rate_limiter.check_rate_limit(ngx.ctx.user, route)
if not rate_limit_result.allowed then
ngx.status = 429
ngx.header.content_type = "application/json"
ngx.header["X-RateLimit-Limit"] = rate_limit_result.limit
ngx.header["X-RateLimit-Remaining"] = rate_limit_result.remaining
ngx.header["X-RateLimit-Reset"] = rate_limit_result.reset_time
ngx.say(cjson.encode({
error = "Rate limit exceeded",
request_id = request_id
}))
return ngx.exit(429)
end
-- 熔断检查
local circuit_result = circuit_breaker.check_circuit(route.service)
if circuit_result.state == "open" then
ngx.status = 503
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({
error = "Service temporarily unavailable",
request_id = request_id
}))
return ngx.exit(503)
end
end
-- 重写阶段处理
function core.rewrite_handler()
local route = ngx.ctx.route
if not route then
return
end
-- 请求转换
transformer.transform_request(route)
-- 设置上游服务信息
ngx.var.upstream_service = route.service
-- 添加追踪头
ngx.req.set_header("X-Request-ID", ngx.ctx.request_id)
ngx.req.set_header("X-Gateway-Version", "1.0")
if ngx.ctx.user then
ngx.req.set_header("X-User-ID", ngx.ctx.user.user_id)
ngx.req.set_header("X-User-Role", ngx.ctx.user.role)
end
end
-- 响应头过滤处理
function core.header_filter_handler()
local route = ngx.ctx.route
if not route then
return
end
-- 添加响应头
ngx.header["X-Request-ID"] = ngx.ctx.request_id
ngx.header["X-Gateway"] = "OpenResty-Gateway"
-- 响应转换
transformer.transform_response_headers(route)
-- 记录响应状态
ngx.ctx.response_status = ngx.status
end
-- 日志阶段处理
function core.log_handler()
local end_time = ngx.now()
local start_time = ngx.ctx.start_time or end_time
local response_time = (end_time - start_time) * 1000 -- 转换为毫秒
-- 更新熔断器状态
if ngx.ctx.route then
local success = ngx.status < 500
circuit_breaker.record_result(ngx.ctx.route.service, success, response_time)
end
-- 记录监控指标
monitor.record_request_end({
request_id = ngx.ctx.request_id,
service = ngx.ctx.route and ngx.ctx.route.service,
method = ngx.var.request_method,
uri = ngx.var.uri,
status = ngx.status,
response_time = response_time,
user_id = ngx.ctx.user and ngx.ctx.user.user_id
})
end
return core
3. 服务发现与路由
3.1 动态路由配置
-- gateway/router.lua - 路由模块
local router = {}
local cjson = require "cjson"
local service_discovery = require "gateway.service_discovery"
-- 路由规则配置
local routes = {
{
id = "user_service",
pattern = "^/api/v1/users(/.*)?$",
service = "user_service",
methods = {"GET", "POST", "PUT", "DELETE"},
auth_required = true,
rate_limit = {
requests_per_minute = 100,
burst = 20
},
timeout = {
connect = 5,
send = 10,
read = 30
},
retry = {
times = 3,
delay = 1
}
},
{
id = "order_service",
pattern = "^/api/v1/orders(/.*)?$",
service = "order_service",
methods = {"GET", "POST", "PUT", "DELETE"},
auth_required = true,
rate_limit = {
requests_per_minute = 200,
burst = 50
},
timeout = {
connect = 5,
send = 10,
read = 30
}
},
{
id = "payment_service",
pattern = "^/api/v1/payments(/.*)?$",
service = "payment_service",
methods = {"POST", "GET"},
auth_required = true,
rate_limit = {
requests_per_minute = 50,
burst = 10
},
timeout = {
connect = 3,
send = 5,
read = 15
},
security = {
require_https = true,
allowed_roles = {"user", "admin"}
}
}
}
-- 解析路由
function router.resolve_route()
local uri = ngx.var.uri
local method = ngx.var.request_method
for _, route in ipairs(routes) do
if string.match(uri, route.pattern) then
-- 检查HTTP方法
local method_allowed = false
for _, allowed_method in ipairs(route.methods) do
if method == allowed_method then
method_allowed = true
break
end
end
if method_allowed then
-- 获取服务实例
local service_instances = service_discovery.get_service_instances(route.service)
if service_instances and #service_instances > 0 then
route.instances = service_instances
return route
end
end
end
end
return nil
end
-- 获取路由配置
function router.get_route_config(route_id)
for _, route in ipairs(routes) do
if route.id == route_id then
return route
end
end
return nil
end
-- 动态添加路由
function router.add_route(route_config)
-- 验证路由配置
if not route_config.id or not route_config.pattern or not route_config.service then
return false, "Invalid route configuration"
end
-- 检查路由是否已存在
for i, route in ipairs(routes) do
if route.id == route_config.id then
routes[i] = route_config -- 更新现有路由
return true, "Route updated"
end
end
-- 添加新路由
table.insert(routes, route_config)
return true, "Route added"
end
-- 删除路由
function router.remove_route(route_id)
for i, route in ipairs(routes) do
if route.id == route_id then
table.remove(routes, i)
return true, "Route removed"
end
end
return false, "Route not found"
end
-- 获取所有路由
function router.get_all_routes()
return routes
end
-- 路由健康检查
function router.check_route_health(route)
local service_instances = service_discovery.get_service_instances(route.service)
local healthy_instances = 0
if service_instances then
for _, instance in ipairs(service_instances) do
if instance.status == "healthy" then
healthy_instances = healthy_instances + 1
end
end
end
return {
route_id = route.id,
service = route.service,
total_instances = service_instances and #service_instances or 0,
healthy_instances = healthy_instances,
health_ratio = service_instances and #service_instances > 0 and
(healthy_instances / #service_instances) or 0
}
end
return router
3.2 服务发现实现
-- gateway/service_discovery.lua - 服务发现模块
local service_discovery = {}
local cjson = require "cjson"
local http = require "resty.http"
-- 服务注册表
local services = {
user_service = {
{
id = "user-service-1",
host = "192.168.1.10",
port = 8001,
weight = 100,
status = "healthy",
last_check = ngx.time(),
metadata = {
version = "1.0.0",
region = "us-west-1"
}
},
{
id = "user-service-2",
host = "192.168.1.11",
port = 8001,
weight = 100,
status = "healthy",
last_check = ngx.time(),
metadata = {
version = "1.0.0",
region = "us-west-1"
}
}
},
order_service = {
{
id = "order-service-1",
host = "192.168.1.20",
port = 8002,
weight = 100,
status = "healthy",
last_check = ngx.time()
},
{
id = "order-service-2",
host = "192.168.1.21",
port = 8002,
weight = 100,
status = "healthy",
last_check = ngx.time()
}
},
payment_service = {
{
id = "payment-service-1",
host = "192.168.1.30",
port = 8003,
weight = 100,
status = "healthy",
last_check = ngx.time()
}
}
}
-- 获取服务实例
function service_discovery.get_service_instances(service_name)
return services[service_name] or {}
end
-- 获取健康的服务实例
function service_discovery.get_healthy_instances(service_name)
local instances = services[service_name] or {}
local healthy_instances = {}
for _, instance in ipairs(instances) do
if instance.status == "healthy" then
table.insert(healthy_instances, instance)
end
end
return healthy_instances
end
-- 注册服务实例
function service_discovery.register_service(service_name, instance)
if not services[service_name] then
services[service_name] = {}
end
-- 检查实例是否已存在
for i, existing_instance in ipairs(services[service_name]) do
if existing_instance.id == instance.id then
services[service_name][i] = instance -- 更新现有实例
return true
end
end
-- 添加新实例
table.insert(services[service_name], instance)
return true
end
-- 注销服务实例
function service_discovery.deregister_service(service_name, instance_id)
if not services[service_name] then
return false
end
for i, instance in ipairs(services[service_name]) do
if instance.id == instance_id then
table.remove(services[service_name], i)
return true
end
end
return false
end
-- 更新服务实例状态
function service_discovery.update_instance_status(service_name, instance_id, status)
if not services[service_name] then
return false
end
for _, instance in ipairs(services[service_name]) do
if instance.id == instance_id then
instance.status = status
instance.last_check = ngx.time()
return true
end
end
return false
end
-- 健康检查
function service_discovery.health_check_instance(instance)
local httpc = http.new()
httpc:set_timeout(5000) -- 5秒超时
local url = string.format("http://%s:%d/health", instance.host, instance.port)
local res, err = httpc:request_uri(url, {
method = "GET",
headers = {
["User-Agent"] = "Gateway-HealthChecker/1.0"
}
})
if not res then
ngx.log(ngx.WARN, "Health check failed for ", instance.id, ": ", err)
return false
end
return res.status == 200
end
-- 批量健康检查
function service_discovery.health_check_all_services()
local results = {}
for service_name, instances in pairs(services) do
results[service_name] = {}
for _, instance in ipairs(instances) do
local is_healthy = service_discovery.health_check_instance(instance)
local new_status = is_healthy and "healthy" or "unhealthy"
if instance.status ~= new_status then
ngx.log(ngx.INFO, "Service instance ", instance.id, " status changed: ",
instance.status, " -> ", new_status)
instance.status = new_status
end
instance.last_check = ngx.time()
table.insert(results[service_name], {
instance_id = instance.id,
status = new_status,
response_time = 0 -- 可以记录响应时间
})
end
end
return results
end
-- 从Consul获取服务
function service_discovery.sync_from_consul(consul_url)
local httpc = http.new()
httpc:set_timeout(10000)
local res, err = httpc:request_uri(consul_url .. "/v1/catalog/services", {
method = "GET"
})
if not res or res.status ~= 200 then
ngx.log(ngx.ERR, "Failed to fetch services from Consul: ", err or res.status)
return false
end
local consul_services = cjson.decode(res.body)
for service_name, tags in pairs(consul_services) do
-- 获取服务实例详情
local service_res, service_err = httpc:request_uri(
consul_url .. "/v1/health/service/" .. service_name .. "?passing=true", {
method = "GET"
})
if service_res and service_res.status == 200 then
local service_instances = cjson.decode(service_res.body)
local instances = {}
for _, consul_instance in ipairs(service_instances) do
local instance = {
id = consul_instance.Service.ID,
host = consul_instance.Service.Address,
port = consul_instance.Service.Port,
weight = 100,
status = "healthy",
last_check = ngx.time(),
metadata = consul_instance.Service.Meta or {}
}
table.insert(instances, instance)
end
services[service_name] = instances
end
end
return true
end
-- 启动健康检查定时器
function service_discovery.start_health_check_timer()
local function health_check_timer(premature)
if premature then
return
end
service_discovery.health_check_all_services()
-- 重新设置定时器(每30秒检查一次)
local ok, err = ngx.timer.at(30, health_check_timer)
if not ok then
ngx.log(ngx.ERR, "Failed to create health check timer: ", err)
end
end
local ok, err = ngx.timer.at(30, health_check_timer)
if not ok then
ngx.log(ngx.ERR, "Failed to create initial health check timer: ", err)
end
end
-- 获取服务统计信息
function service_discovery.get_service_stats()
local stats = {}
for service_name, instances in pairs(services) do
local total_instances = #instances
local healthy_instances = 0
for _, instance in ipairs(instances) do
if instance.status == "healthy" then
healthy_instances = healthy_instances + 1
end
end
stats[service_name] = {
total_instances = total_instances,
healthy_instances = healthy_instances,
health_ratio = total_instances > 0 and (healthy_instances / total_instances) or 0
}
end
return stats
end
return service_discovery
4. 认证与授权
4.1 JWT认证实现
-- gateway/auth.lua - 认证授权模块
local auth = {}
local jwt = require "resty.jwt"
local cjson = require "cjson"
local redis = require "resty.redis"
-- JWT配置
local jwt_config = {
secret = "your-secret-key",
algorithm = "HS256",
expiration = 3600, -- 1小时
issuer = "api-gateway"
}
-- Redis连接
local function get_redis()
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.log(ngx.ERR, "Failed to connect to Redis: ", err)
return nil
end
return red
end
-- 验证JWT令牌
local function verify_jwt_token(token)
local jwt_obj = jwt:verify(jwt_config.secret, token)
if not jwt_obj.valid then
return nil, "Invalid token"
end
local payload = jwt_obj.payload
-- 检查过期时间
if payload.exp and payload.exp < ngx.time() then
return nil, "Token expired"
end
-- 检查发行者
if payload.iss ~= jwt_config.issuer then
return nil, "Invalid issuer"
end
return payload, nil
end
-- 检查令牌黑名单
local function is_token_blacklisted(jti)
local red = get_redis()
if not red then
return false -- Redis不可用时允许通过
end
local result = red:get("blacklist:" .. jti)
red:set_keepalive(10000, 100)
return result and result ~= ngx.null
end
-- 获取用户信息
local function get_user_info(user_id)
local red = get_redis()
if not red then
return nil
end
local user_data = red:get("user:" .. user_id)
red:set_keepalive(10000, 100)
if user_data and user_data ~= ngx.null then
return cjson.decode(user_data)
end
return nil
end
-- 认证处理
function auth.authenticate()
local auth_header = ngx.var.http_authorization
if not auth_header then
return {
success = false,
status = 401,
error = "Missing authorization header"
}
end
-- 解析Bearer令牌
local token = string.match(auth_header, "Bearer%s+(.+)")
if not token then
return {
success = false,
status = 401,
error = "Invalid authorization header format"
}
end
-- 验证JWT令牌
local payload, err = verify_jwt_token(token)
if not payload then
return {
success = false,
status = 401,
error = err or "Token verification failed"
}
end
-- 检查令牌黑名单
if payload.jti and is_token_blacklisted(payload.jti) then
return {
success = false,
status = 401,
error = "Token has been revoked"
}
end
-- 获取用户详细信息
local user = get_user_info(payload.sub)
if not user then
return {
success = false,
status = 401,
error = "User not found"
}
end
-- 检查用户状态
if user.status ~= "active" then
return {
success = false,
status = 401,
error = "User account is not active"
}
end
return {
success = true,
user = {
user_id = user.user_id,
username = user.username,
email = user.email,
role = user.role,
permissions = user.permissions or {},
token_payload = payload
}
}
end
-- 授权检查
function auth.authorize(user, route)
if not route.auth_required then
return true -- 不需要认证的路由
end
if not user then
return false -- 需要认证但用户未认证
end
-- 检查角色权限
if route.security and route.security.allowed_roles then
local role_allowed = false
for _, allowed_role in ipairs(route.security.allowed_roles) do
if user.role == allowed_role then
role_allowed = true
break
end
end
if not role_allowed then
return false
end
end
-- 检查具体权限
if route.security and route.security.required_permissions then
for _, required_permission in ipairs(route.security.required_permissions) do
local has_permission = false
for _, user_permission in ipairs(user.permissions) do
if user_permission == required_permission then
has_permission = true
break
end
end
if not has_permission then
return false
end
end
end
-- 检查资源级权限
if route.security and route.security.resource_check then
return auth.check_resource_permission(user, route)
end
return true
end
-- 资源级权限检查
function auth.check_resource_permission(user, route)
local method = ngx.var.request_method
local uri = ngx.var.uri
-- 提取资源ID(如果有)
local resource_id = string.match(uri, "/([^/]+)$")
-- 根据不同的服务和操作进行权限检查
if route.service == "user_service" then
if method == "GET" or method == "PUT" or method == "DELETE" then
-- 用户只能访问自己的资源,除非是管理员
if user.role == "admin" then
return true
end
if resource_id and resource_id == user.user_id then
return true
end
return false
end
elseif route.service == "order_service" then
if method == "GET" then
-- 用户只能查看自己的订单
return auth.check_order_ownership(user.user_id, resource_id)
end
end
return true
end
-- 检查订单所有权
function auth.check_order_ownership(user_id, order_id)
if not order_id then
return true -- 列表查询,由后端服务过滤
end
local red = get_redis()
if not red then
return true -- Redis不可用时允许通过,由后端服务处理
end
local order_owner = red:get("order:" .. order_id .. ":owner")
red:set_keepalive(10000, 100)
return order_owner == user_id
end
-- 生成JWT令牌
function auth.generate_token(user)
local payload = {
sub = user.user_id,
iss = jwt_config.issuer,
iat = ngx.time(),
exp = ngx.time() + jwt_config.expiration,
jti = string.format("%s-%d-%d", user.user_id, ngx.time(), math.random(10000, 99999)),
role = user.role,
username = user.username
}
local token = jwt:sign(jwt_config.secret, {
header = {
typ = "JWT",
alg = jwt_config.algorithm
},
payload = payload
})
return token
end
-- 撤销令牌
function auth.revoke_token(token)
local payload, err = verify_jwt_token(token)
if not payload then
return false, err
end
if not payload.jti then
return false, "Token does not have JTI"
end
local red = get_redis()
if not red then
return false, "Redis connection failed"
end
local ttl = payload.exp - ngx.time()
if ttl > 0 then
red:setex("blacklist:" .. payload.jti, ttl, "1")
end
red:set_keepalive(10000, 100)
return true
end
-- API密钥认证
function auth.authenticate_api_key()
local api_key = ngx.var.http_x_api_key
if not api_key then
return {
success = false,
status = 401,
error = "Missing API key"
}
end
local red = get_redis()
if not red then
return {
success = false,
status = 500,
error = "Internal server error"
}
end
local api_key_data = red:get("api_key:" .. api_key)
red:set_keepalive(10000, 100)
if not api_key_data or api_key_data == ngx.null then
return {
success = false,
status = 401,
error = "Invalid API key"
}
end
local key_info = cjson.decode(api_key_data)
-- 检查API密钥状态
if key_info.status ~= "active" then
return {
success = false,
status = 401,
error = "API key is not active"
}
end
-- 检查过期时间
if key_info.expires_at and key_info.expires_at < ngx.time() then
return {
success = false,
status = 401,
error = "API key has expired"
}
end
return {
success = true,
user = {
user_id = key_info.user_id,
username = key_info.username,
role = key_info.role or "api_user",
permissions = key_info.permissions or {},
api_key_id = key_info.key_id
}
}
end
return auth
5. 限流与熔断
5.1 限流实现
-- gateway/rate_limiter.lua - 限流模块
local rate_limiter = {}
local redis = require "resty.redis"
-- 限流配置
local rate_limit_config = {
default_requests_per_minute = 100,
default_burst = 20,
window_size = 60, -- 60秒窗口
cleanup_interval = 300 -- 5分钟清理一次过期数据
}
-- Redis连接
local function get_redis()
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.log(ngx.ERR, "Failed to connect to Redis: ", err)
return nil
end
return red
end
-- 滑动窗口限流算法
local function sliding_window_rate_limit(key, limit, window_size)
local red = get_redis()
if not red then
return true, limit, 0, ngx.time() + window_size -- Redis不可用时允许通过
end
local current_time = ngx.time()
local window_start = current_time - window_size
-- 使用Lua脚本确保原子性
local script = [[
local key = KEYS[1]
local current_time = tonumber(ARGV[1])
local window_start = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
-- 清理过期的记录
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
-- 获取当前窗口内的请求数
local current_requests = redis.call('ZCARD', key)
if current_requests < limit then
-- 添加当前请求
redis.call('ZADD', key, current_time, current_time .. ':' .. math.random(10000, 99999))
redis.call('EXPIRE', key, window_size * 2)
return {1, current_requests + 1, limit - current_requests - 1, current_time + window_size}
else
return {0, current_requests, 0, current_time + window_size}
end
]]
local result = red:eval(script, 1, key, current_time, window_start, limit, window_size)
red:set_keepalive(10000, 100)
if result and #result == 4 then
return result[1] == 1, result[2], result[3], result[4]
end
return true, limit, 0, current_time + window_size
end
-- 令牌桶限流算法
local function token_bucket_rate_limit(key, limit, burst, refill_rate)
local red = get_redis()
if not red then
return true, limit, burst, ngx.time() + 60
end
local current_time = ngx.time()
local script = [[
local key = KEYS[1]
local current_time = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local burst = tonumber(ARGV[3])
local refill_rate = tonumber(ARGV[4])
local bucket_data = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket_data[1]) or burst
local last_refill = tonumber(bucket_data[2]) or current_time
-- 计算需要添加的令牌数
local time_passed = current_time - last_refill
local tokens_to_add = math.floor(time_passed * refill_rate / 60) -- 每分钟refill_rate个令牌
tokens = math.min(burst, tokens + tokens_to_add)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', current_time)
redis.call('EXPIRE', key, 3600) -- 1小时过期
return {1, limit, tokens, current_time + 60}
else
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', current_time)
redis.call('EXPIRE', key, 3600)
return {0, limit, 0, current_time + math.ceil(60 / refill_rate)}
end
]]
local result = red:eval(script, 1, key, current_time, limit, burst, refill_rate)
red:set_keepalive(10000, 100)
if result and #result == 4 then
return result[1] == 1, result[2], result[3], result[4]
end
return true, limit, burst, current_time + 60
end
-- 检查限流
function rate_limiter.check_rate_limit(user, route)
local limit_config = route.rate_limit or {
requests_per_minute = rate_limit_config.default_requests_per_minute,
burst = rate_limit_config.default_burst
}
local limit = limit_config.requests_per_minute
local burst = limit_config.burst or limit
-- 生成限流键
local rate_limit_key
if user then
rate_limit_key = string.format("rate_limit:user:%s:%s", user.user_id, route.id)
else
rate_limit_key = string.format("rate_limit:ip:%s:%s", ngx.var.remote_addr, route.id)
end
-- 使用滑动窗口算法
local allowed, current_requests, remaining, reset_time =
sliding_window_rate_limit(rate_limit_key, limit, rate_limit_config.window_size)
return {
allowed = allowed,
limit = limit,
current = current_requests,
remaining = remaining,
reset_time = reset_time
}
end
-- 全局限流检查
function rate_limiter.check_global_rate_limit()
local global_limit = 10000 -- 每分钟10000个请求
local global_key = "rate_limit:global"
local allowed, current_requests, remaining, reset_time =
sliding_window_rate_limit(global_key, global_limit, rate_limit_config.window_size)
return {
allowed = allowed,
limit = global_limit,
current = current_requests,
remaining = remaining,
reset_time = reset_time
}
end
-- IP限流检查
function rate_limiter.check_ip_rate_limit()
local ip_limit = 200 -- 每个IP每分钟200个请求
local ip_key = "rate_limit:ip:" .. ngx.var.remote_addr
local allowed, current_requests, remaining, reset_time =
sliding_window_rate_limit(ip_key, ip_limit, rate_limit_config.window_size)
return {
allowed = allowed,
limit = ip_limit,
current = current_requests,
remaining = remaining,
reset_time = reset_time
}
end
-- 获取限流统计信息
function rate_limiter.get_rate_limit_stats()
local red = get_redis()
if not red then
return {error = "Redis connection failed"}
end
local stats = {
timestamp = ngx.time(),
global = {},
users = {},
ips = {}
}
-- 获取全局限流统计
local global_count = red:zcard("rate_limit:global")
stats.global.current_requests = global_count or 0
-- 获取用户限流统计(示例)
local user_keys = red:keys("rate_limit:user:*")
if user_keys and #user_keys > 0 then
for _, key in ipairs(user_keys) do
local user_id = string.match(key, "rate_limit:user:([^:]+):")
if user_id then
local count = red:zcard(key)
stats.users[user_id] = count or 0
end
end
end
red:set_keepalive(10000, 100)
return stats
end
return rate_limiter
5.2 熔断器实现
-- gateway/circuit_breaker.lua - 熔断器模块
local circuit_breaker = {}
local cjson = require "cjson"
-- 熔断器配置
local circuit_config = {
failure_threshold = 5, -- 失败阈值
success_threshold = 3, -- 恢复阈值
timeout = 60, -- 熔断超时时间(秒)
window_size = 60, -- 统计窗口大小(秒)
min_requests = 10 -- 最小请求数
}
-- 熔断器状态
local CIRCUIT_STATES = {
CLOSED = "closed", -- 关闭状态(正常)
OPEN = "open", -- 开启状态(熔断)
HALF_OPEN = "half_open" -- 半开状态(试探)
}
-- 获取熔断器状态
local function get_circuit_state(service_name)
local cache = ngx.shared.circuit_breaker
if not cache then
return CIRCUIT_STATES.CLOSED
end
local state_data = cache:get(service_name .. ":state")
if state_data then
local state_info = cjson.decode(state_data)
-- 检查熔断器是否应该从OPEN状态转换到HALF_OPEN状态
if state_info.state == CIRCUIT_STATES.OPEN then
local current_time = ngx.time()
if current_time >= state_info.open_time + circuit_config.timeout then
-- 转换到半开状态
state_info.state = CIRCUIT_STATES.HALF_OPEN
state_info.half_open_time = current_time
cache:set(service_name .. ":state", cjson.encode(state_info), 3600)
ngx.log(ngx.INFO, "Circuit breaker for ", service_name, " changed to HALF_OPEN")
end
end
return state_info.state
end
return CIRCUIT_STATES.CLOSED
end
-- 设置熔断器状态
local function set_circuit_state(service_name, state)
local cache = ngx.shared.circuit_breaker
if not cache then
return
end
local current_time = ngx.time()
local state_info = {
state = state,
timestamp = current_time
}
if state == CIRCUIT_STATES.OPEN then
state_info.open_time = current_time
elseif state == CIRCUIT_STATES.HALF_OPEN then
state_info.half_open_time = current_time
end
cache:set(service_name .. ":state", cjson.encode(state_info), 3600)
ngx.log(ngx.WARN, "Circuit breaker for ", service_name, " changed to ", state)
end
-- 记录请求结果
local function record_request_result(service_name, success, response_time)
local cache = ngx.shared.circuit_breaker
if not cache then
return
end
local current_time = ngx.time()
local window_start = current_time - circuit_config.window_size
-- 获取当前统计数据
local stats_key = service_name .. ":stats"
local stats_data = cache:get(stats_key)
local stats = stats_data and cjson.decode(stats_data) or {
requests = {},
total_requests = 0,
failed_requests = 0,
total_response_time = 0
}
-- 清理过期的请求记录
local new_requests = {}
for _, request in ipairs(stats.requests) do
if request.timestamp >= window_start then
table.insert(new_requests, request)
else
stats.total_requests = stats.total_requests - 1
if not request.success then
stats.failed_requests = stats.failed_requests - 1
end
stats.total_response_time = stats.total_response_time - request.response_time
end
end
-- 添加新的请求记录
table.insert(new_requests, {
timestamp = current_time,
success = success,
response_time = response_time
})
stats.requests = new_requests
stats.total_requests = stats.total_requests + 1
stats.total_response_time = stats.total_response_time + response_time
if not success then
stats.failed_requests = stats.failed_requests + 1
end
cache:set(stats_key, cjson.encode(stats), circuit_config.window_size * 2)
return stats
end
-- 检查是否应该触发熔断
local function should_trip_circuit(stats)
if stats.total_requests < circuit_config.min_requests then
return false
end
local failure_rate = stats.failed_requests / stats.total_requests
local failure_threshold_rate = circuit_config.failure_threshold / 10 -- 转换为比率
return failure_rate >= failure_threshold_rate
end
-- 检查熔断器状态
function circuit_breaker.check_circuit(service_name)
local state = get_circuit_state(service_name)
return {
state = state,
service = service_name,
timestamp = ngx.time()
}
end
-- 记录请求结果
function circuit_breaker.record_result(service_name, success, response_time)
local state = get_circuit_state(service_name)
-- 记录统计数据
local stats = record_request_result(service_name, success, response_time or 0)
if state == CIRCUIT_STATES.CLOSED then
-- 关闭状态:检查是否应该熔断
if should_trip_circuit(stats) then
set_circuit_state(service_name, CIRCUIT_STATES.OPEN)
end
elseif state == CIRCUIT_STATES.HALF_OPEN then
-- 半开状态:根据结果决定下一步
if success then
-- 成功:检查是否达到恢复阈值
local recent_successes = 0
local current_time = ngx.time()
for _, request in ipairs(stats.requests) do
if request.timestamp >= current_time - 30 and request.success then -- 最近30秒的成功请求
recent_successes = recent_successes + 1
end
end
if recent_successes >= circuit_config.success_threshold then
set_circuit_state(service_name, CIRCUIT_STATES.CLOSED)
end
else
-- 失败:重新熔断
set_circuit_state(service_name, CIRCUIT_STATES.OPEN)
end
end
end
-- 获取熔断器统计信息
function circuit_breaker.get_circuit_stats(service_name)
local cache = ngx.shared.circuit_breaker
if not cache then
return {error = "Circuit breaker cache not available"}
end
local state = get_circuit_state(service_name)
local stats_data = cache:get(service_name .. ":stats")
local stats = stats_data and cjson.decode(stats_data) or {
total_requests = 0,
failed_requests = 0,
total_response_time = 0
}
local failure_rate = stats.total_requests > 0 and
(stats.failed_requests / stats.total_requests) or 0
local avg_response_time = stats.total_requests > 0 and
(stats.total_response_time / stats.total_requests) or 0
return {
service = service_name,
state = state,
total_requests = stats.total_requests,
failed_requests = stats.failed_requests,
success_requests = stats.total_requests - stats.failed_requests,
failure_rate = failure_rate,
avg_response_time = avg_response_time,
timestamp = ngx.time()
}
end
-- 获取所有服务的熔断器状态
function circuit_breaker.get_all_circuits_stats()
local cache = ngx.shared.circuit_breaker
if not cache then
return {error = "Circuit breaker cache not available"}
end
local all_stats = {}
local services = {"user_service", "order_service", "payment_service"} -- 可以动态获取
for _, service_name in ipairs(services) do
all_stats[service_name] = circuit_breaker.get_circuit_stats(service_name)
end
return all_stats
end
-- 手动重置熔断器
function circuit_breaker.reset_circuit(service_name)
local cache = ngx.shared.circuit_breaker
if not cache then
return false, "Circuit breaker cache not available"
end
cache:delete(service_name .. ":state")
cache:delete(service_name .. ":stats")
ngx.log(ngx.INFO, "Circuit breaker for ", service_name, " has been reset")
return true, "Circuit breaker reset successfully"
end
return circuit_breaker
6. 监控与管理
6.1 监控指标收集
-- gateway/monitor.lua - 监控模块
local monitor = {}
local cjson = require "cjson"
local http = require "resty.http"
-- 监控配置
local monitor_config = {
metrics_interval = 60, -- 指标收集间隔(秒)
retention_period = 3600, -- 数据保留时间(秒)
prometheus_enabled = true,
prometheus_url = "http://localhost:9090"
}
-- 记录请求开始
function monitor.record_request_start()
local cache = ngx.shared.gateway_cache
if not cache then
return
end
-- 增加总请求计数
local total_requests = cache:incr("metrics:total_requests", 1, 0)
-- 按服务统计
local service = ngx.var.service_name
if service then
cache:incr("metrics:service:" .. service .. ":requests", 1, 0)
end
-- 按状态码统计(在请求结束时更新)
ngx.ctx.request_start_time = ngx.now()
end
-- 记录请求结束
function monitor.record_request_end(request_info)
local cache = ngx.shared.gateway_cache
if not cache then
return
end
local service = request_info.service
local status = request_info.status
local response_time = request_info.response_time
-- 按状态码统计
cache:incr("metrics:status:" .. status, 1, 0)
if service then
cache:incr("metrics:service:" .. service .. ":status:" .. status, 1, 0)
-- 响应时间统计
local response_time_key = "metrics:service:" .. service .. ":response_time"
local current_total = cache:get(response_time_key .. ":total") or 0
local current_count = cache:get(response_time_key .. ":count") or 0
cache:set(response_time_key .. ":total", current_total + response_time, 3600)
cache:set(response_time_key .. ":count", current_count + 1, 3600)
-- 记录错误率
if status >= 400 then
cache:incr("metrics:service:" .. service .. ":errors", 1, 0)
end
end
-- 用户请求统计
if request_info.user_id then
cache:incr("metrics:user:" .. request_info.user_id .. ":requests", 1, 0)
end
end
-- 获取监控指标
function monitor.get_metrics()
local cache = ngx.shared.gateway_cache
if not cache then
return {error = "Cache not available"}
end
local metrics = {
timestamp = ngx.time(),
total_requests = cache:get("metrics:total_requests") or 0,
status_codes = {},
services = {},
response_times = {}
}
-- 状态码统计
local status_codes = {200, 201, 400, 401, 403, 404, 429, 500, 502, 503}
for _, status in ipairs(status_codes) do
metrics.status_codes[tostring(status)] = cache:get("metrics:status:" .. status) or 0
end
-- 服务统计
local services = {"user_service", "order_service", "payment_service"}
for _, service in ipairs(services) do
local service_requests = cache:get("metrics:service:" .. service .. ":requests") or 0
local service_errors = cache:get("metrics:service:" .. service .. ":errors") or 0
local response_time_total = cache:get("metrics:service:" .. service .. ":response_time:total") or 0
local response_time_count = cache:get("metrics:service:" .. service .. ":response_time:count") or 0
metrics.services[service] = {
requests = service_requests,
errors = service_errors,
error_rate = service_requests > 0 and (service_errors / service_requests) or 0,
avg_response_time = response_time_count > 0 and (response_time_total / response_time_count) or 0
}
end
return metrics
end
-- Prometheus指标格式化
function monitor.format_prometheus_metrics()
local metrics = monitor.get_metrics()
if metrics.error then
return nil, metrics.error
end
local prometheus_output = {}
-- 总请求数
table.insert(prometheus_output, string.format(
"# HELP gateway_requests_total Total number of requests\n" ..
"# TYPE gateway_requests_total counter\n" ..
"gateway_requests_total %d", metrics.total_requests))
-- 按状态码统计
table.insert(prometheus_output,
"# HELP gateway_requests_by_status_total Requests by status code\n" ..
"# TYPE gateway_requests_by_status_total counter")
for status, count in pairs(metrics.status_codes) do
table.insert(prometheus_output, string.format(
"gateway_requests_by_status_total{status=\"%s\"} %d", status, count))
end
-- 按服务统计
table.insert(prometheus_output,
"# HELP gateway_service_requests_total Requests by service\n" ..
"# TYPE gateway_service_requests_total counter")
table.insert(prometheus_output,
"# HELP gateway_service_errors_total Errors by service\n" ..
"# TYPE gateway_service_errors_total counter")
table.insert(prometheus_output,
"# HELP gateway_service_response_time_avg Average response time by service\n" ..
"# TYPE gateway_service_response_time_avg gauge")
for service, stats in pairs(metrics.services) do
table.insert(prometheus_output, string.format(
"gateway_service_requests_total{service=\"%s\"} %d", service, stats.requests))
table.insert(prometheus_output, string.format(
"gateway_service_errors_total{service=\"%s\"} %d", service, stats.errors))
table.insert(prometheus_output, string.format(
"gateway_service_response_time_avg{service=\"%s\"} %.2f", service, stats.avg_response_time))
end
return table.concat(prometheus_output, "\n")
end
return monitor
6.2 管理接口实现
-- gateway/admin.lua - 管理接口模块
local admin = {}
local cjson = require "cjson"
local router = require "gateway.router"
local circuit_breaker = require "gateway.circuit_breaker"
local rate_limiter = require "gateway.rate_limiter"
local monitor = require "gateway.monitor"
local service_discovery = require "gateway.service_discovery"
-- 处理管理请求
function admin.handle_request()
local method = ngx.var.request_method
local uri = ngx.var.uri
ngx.header.content_type = "application/json"
-- 路由分发
if string.match(uri, "^/admin/routes") then
admin.handle_routes_request(method, uri)
elseif string.match(uri, "^/admin/services") then
admin.handle_services_request(method, uri)
elseif string.match(uri, "^/admin/circuit-breakers") then
admin.handle_circuit_breakers_request(method, uri)
elseif string.match(uri, "^/admin/rate-limits") then
admin.handle_rate_limits_request(method, uri)
elseif string.match(uri, "^/admin/metrics") then
admin.handle_metrics_request(method, uri)
elseif string.match(uri, "^/admin/health") then
admin.handle_health_request(method, uri)
else
ngx.status = 404
ngx.say(cjson.encode({error = "Endpoint not found"}))
end
end
-- 路由管理
function admin.handle_routes_request(method, uri)
if method == "GET" then
if string.match(uri, "/admin/routes$") then
-- 获取所有路由
local routes = router.get_all_routes()
ngx.say(cjson.encode({routes = routes}))
else
-- 获取特定路由
local route_id = string.match(uri, "/admin/routes/([^/]+)$")
if route_id then
local route = router.get_route_config(route_id)
if route then
ngx.say(cjson.encode({route = route}))
else
ngx.status = 404
ngx.say(cjson.encode({error = "Route not found"}))
end
else
ngx.status = 400
ngx.say(cjson.encode({error = "Invalid route ID"}))
end
end
elseif method == "POST" then
-- 添加新路由
ngx.req.read_body()
local body = ngx.req.get_body_data()
if not body then
ngx.status = 400
ngx.say(cjson.encode({error = "Request body required"}))
return
end
local success, route_config = pcall(cjson.decode, body)
if not success then
ngx.status = 400
ngx.say(cjson.encode({error = "Invalid JSON"}))
return
end
local ok, message = router.add_route(route_config)
if ok then
ngx.status = 201
ngx.say(cjson.encode({message = message, route = route_config}))
else
ngx.status = 400
ngx.say(cjson.encode({error = message}))
end
elseif method == "DELETE" then
-- 删除路由
local route_id = string.match(uri, "/admin/routes/([^/]+)$")
if route_id then
local ok, message = router.remove_route(route_id)
if ok then
ngx.say(cjson.encode({message = message}))
else
ngx.status = 404
ngx.say(cjson.encode({error = message}))
end
else
ngx.status = 400
ngx.say(cjson.encode({error = "Route ID required"}))
end
else
ngx.status = 405
ngx.say(cjson.encode({error = "Method not allowed"}))
end
end
-- 服务管理
function admin.handle_services_request(method, uri)
if method == "GET" then
if string.match(uri, "/admin/services$") then
-- 获取所有服务统计
local stats = service_discovery.get_service_stats()
ngx.say(cjson.encode({services = stats}))
else
-- 获取特定服务实例
local service_name = string.match(uri, "/admin/services/([^/]+)$")
if service_name then
local instances = service_discovery.get_service_instances(service_name)
ngx.say(cjson.encode({service = service_name, instances = instances}))
else
ngx.status = 400
ngx.say(cjson.encode({error = "Invalid service name"}))
end
end
elseif method == "POST" then
-- 注册服务实例
local service_name = string.match(uri, "/admin/services/([^/]+)/instances$")
if not service_name then
ngx.status = 400
ngx.say(cjson.encode({error = "Service name required"}))
return
end
ngx.req.read_body()
local body = ngx.req.get_body_data()
if not body then
ngx.status = 400
ngx.say(cjson.encode({error = "Request body required"}))
return
end
local success, instance = pcall(cjson.decode, body)
if not success then
ngx.status = 400
ngx.say(cjson.encode({error = "Invalid JSON"}))
return
end
local ok = service_discovery.register_service(service_name, instance)
if ok then
ngx.status = 201
ngx.say(cjson.encode({message = "Service instance registered", instance = instance}))
else
ngx.status = 500
ngx.say(cjson.encode({error = "Failed to register service instance"}))
end
else
ngx.status = 405
ngx.say(cjson.encode({error = "Method not allowed"}))
end
end
-- 熔断器管理
function admin.handle_circuit_breakers_request(method, uri)
if method == "GET" then
if string.match(uri, "/admin/circuit-breakers$") then
-- 获取所有熔断器状态
local stats = circuit_breaker.get_all_circuits_stats()
ngx.say(cjson.encode({circuit_breakers = stats}))
else
-- 获取特定服务的熔断器状态
local service_name = string.match(uri, "/admin/circuit-breakers/([^/]+)$")
if service_name then
local stats = circuit_breaker.get_circuit_stats(service_name)
ngx.say(cjson.encode({circuit_breaker = stats}))
else
ngx.status = 400
ngx.say(cjson.encode({error = "Invalid service name"}))
end
end
elseif method == "POST" then
-- 重置熔断器
local service_name = string.match(uri, "/admin/circuit-breakers/([^/]+)/reset$")
if service_name then
local ok, message = circuit_breaker.reset_circuit(service_name)
if ok then
ngx.say(cjson.encode({message = message}))
else
ngx.status = 500
ngx.say(cjson.encode({error = message}))
end
else
ngx.status = 400
ngx.say(cjson.encode({error = "Service name required"}))
end
else
ngx.status = 405
ngx.say(cjson.encode({error = "Method not allowed"}))
end
end
-- 限流管理
function admin.handle_rate_limits_request(method, uri)
if method == "GET" then
local stats = rate_limiter.get_rate_limit_stats()
ngx.say(cjson.encode({rate_limits = stats}))
else
ngx.status = 405
ngx.say(cjson.encode({error = "Method not allowed"}))
end
end
-- 监控指标
function admin.handle_metrics_request(method, uri)
if method == "GET" then
if string.match(uri, "/admin/metrics/prometheus$") then
-- Prometheus格式
local prometheus_metrics, err = monitor.format_prometheus_metrics()
if prometheus_metrics then
ngx.header.content_type = "text/plain"
ngx.say(prometheus_metrics)
else
ngx.status = 500
ngx.header.content_type = "application/json"
ngx.say(cjson.encode({error = err or "Failed to generate metrics"}))
end
else
-- JSON格式
local metrics = monitor.get_metrics()
ngx.say(cjson.encode({metrics = metrics}))
end
else
ngx.status = 405
ngx.say(cjson.encode({error = "Method not allowed"}))
end
end
-- 健康检查
function admin.handle_health_request(method, uri)
if method == "GET" then
local health_status = {
status = "healthy",
timestamp = ngx.time(),
version = "1.0.0",
services = service_discovery.get_service_stats(),
circuit_breakers = circuit_breaker.get_all_circuits_stats()
}
ngx.say(cjson.encode(health_status))
else
ngx.status = 405
ngx.say(cjson.encode({error = "Method not allowed"}))
end
end
return admin
7. 部署与最佳实践
7.1 生产环境部署
# 生产环境部署脚本
#!/bin/bash
# 创建目录结构
mkdir -p /opt/gateway/{conf,logs,lua,ssl}
mkdir -p /opt/gateway/lua/{gateway,vendor}
# 复制配置文件
cp nginx.conf /opt/gateway/conf/
cp -r lua/gateway/* /opt/gateway/lua/gateway/
# 设置权限
chown -R nginx:nginx /opt/gateway
chmod -R 755 /opt/gateway
# 启动服务
systemctl start openresty
systemctl enable openresty
7.2 性能优化建议
- 连接池优化:合理配置upstream连接池大小
- 缓存策略:充分利用共享内存和Redis缓存
- 负载均衡:选择合适的负载均衡算法
- 监控告警:建立完善的监控和告警机制
- 安全防护:实施多层安全防护措施
7.3 故障排查
- 日志分析:通过日志快速定位问题
- 性能监控:实时监控关键指标
- 熔断恢复:及时处理服务熔断
- 限流调整:根据业务需求调整限流策略
总结
OpenResty作为API网关在微服务架构中发挥着重要作用,提供了:
- 统一入口:所有外部请求的统一接入点
- 服务治理:路由、负载均衡、服务发现
- 安全控制:认证、授权、限流、防护
- 监控运维:日志、监控、管理接口
- 高性能:基于Nginx的高并发处理能力
通过合理的架构设计和配置优化,OpenResty可以构建出高性能、高可用的微服务API网关系统。