1. 负载均衡概述
1.1 负载均衡的重要性
在现代Web应用架构中,负载均衡是确保系统高可用性和性能的关键技术: - 分散负载:将请求分散到多个后端服务器 - 提高可用性:单点故障不会影响整体服务 - 水平扩展:通过增加服务器提升处理能力 - 故障隔离:自动检测和隔离故障节点 - 性能优化:根据服务器性能分配请求
1.2 OpenResty负载均衡架构
┌─────────────────┐
│ 客户端请求 │
└─────────┬───────┘
│
┌─────────▼───────┐
│ OpenResty │ ← 负载均衡器
│ (Nginx+Lua) │
└─────────┬───────┘
│
┌─────┴─────┐
│ │
┌───▼───┐ ┌───▼───┐ ┌───────┐
│Server1│ │Server2│ │Server3│ ← 后端服务器
└───────┘ └───────┘ └───────┘
1.3 负载均衡算法
OpenResty支持多种负载均衡算法: - 轮询(Round Robin):依次分配请求 - 加权轮询(Weighted Round Robin):根据权重分配 - 最少连接(Least Connections):分配到连接数最少的服务器 - IP哈希(IP Hash):根据客户端IP哈希分配 - 一致性哈希(Consistent Hash):支持动态扩缩容 - 自定义算法:基于Lua实现复杂逻辑
2. Nginx负载均衡配置
2.1 基础负载均衡配置
# nginx.conf
http {
# 定义后端服务器组
upstream backend_servers {
# 轮询算法(默认)
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}
# 加权轮询
upstream weighted_backend {
server 192.168.1.10:8080 weight=3;
server 192.168.1.11:8080 weight=2;
server 192.168.1.12:8080 weight=1;
}
# 最少连接
upstream least_conn_backend {
least_conn;
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}
# IP哈希
upstream ip_hash_backend {
ip_hash;
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}
# 一致性哈希
upstream consistent_hash_backend {
hash $request_uri consistent;
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}
server {
listen 80;
server_name api.example.com;
location /api/ {
proxy_pass http://backend_servers;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 连接超时设置
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
# 重试设置
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;
proxy_next_upstream_tries 3;
proxy_next_upstream_timeout 10s;
}
}
}
2.2 高级负载均衡配置
# 高级负载均衡配置
upstream advanced_backend {
# 服务器配置参数
server 192.168.1.10:8080 weight=3 max_fails=2 fail_timeout=30s;
server 192.168.1.11:8080 weight=2 max_fails=2 fail_timeout=30s;
server 192.168.1.12:8080 weight=1 max_fails=2 fail_timeout=30s backup;
server 192.168.1.13:8080 down; # 临时下线
# 连接保持
keepalive 32;
keepalive_requests 100;
keepalive_timeout 60s;
# 健康检查(需要nginx-plus或第三方模块)
# health_check interval=5s fails=3 passes=2;
}
# 多层负载均衡
upstream app_servers {
server 192.168.1.10:8080;
server 192.168.1.11:8080;
}
upstream api_servers {
server 192.168.1.20:8080;
server 192.168.1.21:8080;
}
server {
listen 80;
server_name example.com;
# 根据路径分发
location /app/ {
proxy_pass http://app_servers;
include proxy_params;
}
location /api/ {
proxy_pass http://api_servers;
include proxy_params;
}
# 根据请求头分发
location / {
set $backend app_servers;
if ($http_x_api_version) {
set $backend api_servers;
}
proxy_pass http://$backend;
include proxy_params;
}
}
2.3 动态负载均衡配置
# 使用Lua实现动态负载均衡
upstream dynamic_backend {
server 0.0.0.1; # 占位符
balancer_by_lua_block {
local balancer = require "ngx.balancer"
local backend_manager = require "backend_manager"
-- 获取可用的后端服务器
local server = backend_manager.get_server()
if not server then
ngx.log(ngx.ERR, "No available backend servers")
return ngx.exit(503)
end
-- 设置后端服务器
local ok, err = balancer.set_current_peer(server.host, server.port)
if not ok then
ngx.log(ngx.ERR, "Failed to set current peer: ", err)
return ngx.exit(503)
end
-- 设置连接超时
balancer.set_timeouts(5, 10, 10)
}
}
3. Lua负载均衡实现
3.1 后端服务器管理模块
-- 后端服务器管理模块
local backend_manager = {}
local cjson = require "cjson"
local resty_lock = require "resty.lock"
-- 服务器配置
local servers = {
{
host = "192.168.1.10",
port = 8080,
weight = 3,
max_fails = 2,
fail_timeout = 30,
current_fails = 0,
last_fail_time = 0,
status = "up" -- up, down, backup
},
{
host = "192.168.1.11",
port = 8080,
weight = 2,
max_fails = 2,
fail_timeout = 30,
current_fails = 0,
last_fail_time = 0,
status = "up"
},
{
host = "192.168.1.12",
port = 8080,
weight = 1,
max_fails = 2,
fail_timeout = 30,
current_fails = 0,
last_fail_time = 0,
status = "backup"
}
}
-- 负载均衡状态
local lb_state = {
current_index = 1,
total_weight = 0,
weighted_servers = {}
}
-- 初始化负载均衡器
function backend_manager.init()
-- 计算总权重
lb_state.total_weight = 0
lb_state.weighted_servers = {}
for _, server in ipairs(servers) do
if server.status == "up" then
lb_state.total_weight = lb_state.total_weight + server.weight
-- 创建加权服务器列表
for i = 1, server.weight do
table.insert(lb_state.weighted_servers, server)
end
end
end
ngx.log(ngx.INFO, "Backend manager initialized with ", #lb_state.weighted_servers, " weighted servers")
end
-- 检查服务器健康状态
function backend_manager.check_server_health(server)
local current_time = ngx.time()
-- 如果在失败超时期内,检查是否可以恢复
if server.current_fails > 0 and
(current_time - server.last_fail_time) >= server.fail_timeout then
server.current_fails = 0
server.status = "up"
ngx.log(ngx.INFO, "Server recovered: ", server.host, ":", server.port)
end
return server.status == "up"
end
-- 标记服务器失败
function backend_manager.mark_server_failed(server)
server.current_fails = server.current_fails + 1
server.last_fail_time = ngx.time()
if server.current_fails >= server.max_fails then
server.status = "down"
ngx.log(ngx.WARN, "Server marked as down: ", server.host, ":", server.port)
-- 重新初始化负载均衡器
backend_manager.init()
end
end
-- 轮询算法
function backend_manager.round_robin()
local available_servers = {}
for _, server in ipairs(servers) do
if backend_manager.check_server_health(server) then
table.insert(available_servers, server)
end
end
if #available_servers == 0 then
-- 尝试使用备份服务器
for _, server in ipairs(servers) do
if server.status == "backup" then
table.insert(available_servers, server)
end
end
end
if #available_servers == 0 then
return nil
end
local server = available_servers[lb_state.current_index]
lb_state.current_index = (lb_state.current_index % #available_servers) + 1
return server
end
-- 加权轮询算法
function backend_manager.weighted_round_robin()
if #lb_state.weighted_servers == 0 then
backend_manager.init()
end
if #lb_state.weighted_servers == 0 then
return nil
end
local server = lb_state.weighted_servers[lb_state.current_index]
lb_state.current_index = (lb_state.current_index % #lb_state.weighted_servers) + 1
if backend_manager.check_server_health(server) then
return server
end
-- 如果当前服务器不健康,尝试下一个
return backend_manager.weighted_round_robin()
end
-- 最少连接算法
function backend_manager.least_connections()
local min_connections = math.huge
local selected_server = nil
for _, server in ipairs(servers) do
if backend_manager.check_server_health(server) then
local connections = backend_manager.get_server_connections(server)
if connections < min_connections then
min_connections = connections
selected_server = server
end
end
end
return selected_server
end
-- 获取服务器连接数(模拟)
function backend_manager.get_server_connections(server)
local cache = ngx.shared.backend_stats
if not cache then
return 0
end
local key = server.host .. ":" .. server.port .. ":connections"
return cache:get(key) or 0
end
-- IP哈希算法
function backend_manager.ip_hash()
local client_ip = ngx.var.remote_addr
local hash = ngx.crc32_long(client_ip)
local available_servers = {}
for _, server in ipairs(servers) do
if backend_manager.check_server_health(server) then
table.insert(available_servers, server)
end
end
if #available_servers == 0 then
return nil
end
local index = (hash % #available_servers) + 1
return available_servers[index]
end
-- 一致性哈希算法
function backend_manager.consistent_hash(key)
key = key or ngx.var.request_uri
local hash = ngx.crc32_long(key)
-- 简化的一致性哈希实现
local available_servers = {}
for _, server in ipairs(servers) do
if backend_manager.check_server_health(server) then
table.insert(available_servers, server)
end
end
if #available_servers == 0 then
return nil
end
local index = (hash % #available_servers) + 1
return available_servers[index]
end
-- 自定义负载均衡算法
function backend_manager.custom_algorithm()
local current_time = ngx.time()
local best_server = nil
local best_score = -1
for _, server in ipairs(servers) do
if backend_manager.check_server_health(server) then
-- 计算服务器评分(权重 + 健康度 + 响应时间)
local connections = backend_manager.get_server_connections(server)
local response_time = backend_manager.get_server_response_time(server)
local score = server.weight * 100 - connections * 10 - response_time
if score > best_score then
best_score = score
best_server = server
end
end
end
return best_server
end
-- 获取服务器响应时间(模拟)
function backend_manager.get_server_response_time(server)
local cache = ngx.shared.backend_stats
if not cache then
return 0
end
local key = server.host .. ":" .. server.port .. ":response_time"
return cache:get(key) or 0
end
-- 获取服务器(主入口)
function backend_manager.get_server(algorithm)
algorithm = algorithm or "weighted_round_robin"
local server = nil
if algorithm == "round_robin" then
server = backend_manager.round_robin()
elseif algorithm == "weighted_round_robin" then
server = backend_manager.weighted_round_robin()
elseif algorithm == "least_connections" then
server = backend_manager.least_connections()
elseif algorithm == "ip_hash" then
server = backend_manager.ip_hash()
elseif algorithm == "consistent_hash" then
server = backend_manager.consistent_hash()
elseif algorithm == "custom" then
server = backend_manager.custom_algorithm()
else
server = backend_manager.weighted_round_robin()
end
return server
end
-- 更新服务器统计信息
function backend_manager.update_server_stats(server, response_time, success)
local cache = ngx.shared.backend_stats
if not cache then
return
end
local server_key = server.host .. ":" .. server.port
-- 更新连接数
local conn_key = server_key .. ":connections"
local connections = cache:get(conn_key) or 0
if success then
cache:set(conn_key, math.max(0, connections - 1), 60)
else
cache:set(conn_key, connections + 1, 60)
end
-- 更新响应时间
local rt_key = server_key .. ":response_time"
cache:set(rt_key, response_time, 60)
-- 如果请求失败,标记服务器失败
if not success then
backend_manager.mark_server_failed(server)
end
end
-- 获取所有服务器状态
function backend_manager.get_servers_status()
local status = {}
for i, server in ipairs(servers) do
local connections = backend_manager.get_server_connections(server)
local response_time = backend_manager.get_server_response_time(server)
status[i] = {
host = server.host,
port = server.port,
weight = server.weight,
status = server.status,
current_fails = server.current_fails,
max_fails = server.max_fails,
connections = connections,
response_time = response_time
}
end
return status
end
return backend_manager
3.2 健康检查模块
-- 健康检查模块
local health_checker = {}
local http = require "resty.http"
local cjson = require "cjson"
-- 健康检查配置
local health_config = {
check_interval = 10, -- 10秒检查一次
timeout = 5, -- 5秒超时
check_path = "/health",
expected_status = 200,
max_fails = 3,
recovery_time = 30
}
-- 执行健康检查
function health_checker.check_server(server)
local httpc = http.new()
httpc:set_timeout(health_config.timeout * 1000)
local url = string.format("http://%s:%d%s", server.host, server.port, health_config.check_path)
local res, err = httpc:request_uri(url, {
method = "GET",
headers = {
["User-Agent"] = "OpenResty-HealthChecker/1.0"
}
})
if not res then
ngx.log(ngx.WARN, "Health check failed for ", server.host, ":", server.port, " - ", err)
return false
end
if res.status == health_config.expected_status then
return true
else
ngx.log(ngx.WARN, "Health check failed for ", server.host, ":", server.port,
" - status: ", res.status)
return false
end
end
-- 批量健康检查
function health_checker.check_all_servers(servers)
local results = {}
for i, server in ipairs(servers) do
if server.status ~= "down" then
local is_healthy = health_checker.check_server(server)
results[i] = {
server = server,
healthy = is_healthy,
check_time = ngx.time()
}
-- 更新服务器状态
if is_healthy then
if server.status == "recovering" then
server.status = "up"
server.current_fails = 0
ngx.log(ngx.INFO, "Server recovered: ", server.host, ":", server.port)
end
else
server.current_fails = (server.current_fails or 0) + 1
if server.current_fails >= health_config.max_fails then
server.status = "down"
server.last_fail_time = ngx.time()
ngx.log(ngx.WARN, "Server marked as down: ", server.host, ":", server.port)
end
end
else
-- 检查下线服务器是否可以恢复
local current_time = ngx.time()
if server.last_fail_time and
(current_time - server.last_fail_time) >= health_config.recovery_time then
local is_healthy = health_checker.check_server(server)
if is_healthy then
server.status = "recovering"
ngx.log(ngx.INFO, "Server entering recovery: ", server.host, ":", server.port)
end
end
end
end
return results
end
-- 启动健康检查定时器
function health_checker.start_health_check(servers)
local function check_health(premature)
if premature then
return
end
-- 执行健康检查
local results = health_checker.check_all_servers(servers)
-- 记录检查结果
local healthy_count = 0
for _, result in pairs(results) do
if result.healthy then
healthy_count = healthy_count + 1
end
end
ngx.log(ngx.INFO, "Health check completed: ", healthy_count, "/", #results, " servers healthy")
-- 重新设置定时器
local ok, err = ngx.timer.at(health_config.check_interval, check_health)
if not ok then
ngx.log(ngx.ERR, "Failed to create health check timer: ", err)
end
end
-- 启动初始定时器
local ok, err = ngx.timer.at(health_config.check_interval, check_health)
if not ok then
ngx.log(ngx.ERR, "Failed to create initial health check timer: ", err)
end
end
-- 获取健康检查报告
function health_checker.get_health_report(servers)
local report = {
timestamp = ngx.time(),
total_servers = #servers,
healthy_servers = 0,
unhealthy_servers = 0,
recovering_servers = 0,
servers = {}
}
for i, server in ipairs(servers) do
local server_info = {
host = server.host,
port = server.port,
status = server.status,
current_fails = server.current_fails or 0,
last_fail_time = server.last_fail_time
}
if server.status == "up" then
report.healthy_servers = report.healthy_servers + 1
elseif server.status == "down" then
report.unhealthy_servers = report.unhealthy_servers + 1
elseif server.status == "recovering" then
report.recovering_servers = report.recovering_servers + 1
end
table.insert(report.servers, server_info)
end
return report
end
return health_checker
3.3 会话保持模块
-- 会话保持模块
local session_persistence = {}
local cjson = require "cjson"
-- 会话配置
local session_config = {
method = "cookie", -- cookie, header, ip
cookie_name = "BACKEND_SERVER",
header_name = "X-Backend-Server",
ttl = 3600 -- 1小时
}
-- 基于Cookie的会话保持
function session_persistence.cookie_persistence(servers)
local cookie_value = ngx.var["cookie_" .. session_config.cookie_name]
if cookie_value then
-- 查找对应的服务器
for _, server in ipairs(servers) do
local server_id = server.host .. ":" .. server.port
if server_id == cookie_value and server.status == "up" then
return server
end
end
end
-- 如果没有找到或服务器不可用,选择新的服务器
local backend_manager = require "backend_manager"
local server = backend_manager.get_server()
if server then
local server_id = server.host .. ":" .. server.port
local cookie = string.format("%s=%s; Path=/; Max-Age=%d",
session_config.cookie_name, server_id, session_config.ttl)
ngx.header["Set-Cookie"] = cookie
end
return server
end
-- 基于请求头的会话保持
function session_persistence.header_persistence(servers)
local header_value = ngx.var["http_" .. string.lower(string.gsub(session_config.header_name, "-", "_"))]
if header_value then
for _, server in ipairs(servers) do
local server_id = server.host .. ":" .. server.port
if server_id == header_value and server.status == "up" then
return server
end
end
end
-- 选择新的服务器并设置响应头
local backend_manager = require "backend_manager"
local server = backend_manager.get_server()
if server then
local server_id = server.host .. ":" .. server.port
ngx.header[session_config.header_name] = server_id
end
return server
end
-- 基于IP的会话保持
function session_persistence.ip_persistence(servers)
local client_ip = ngx.var.remote_addr
local cache = ngx.shared.session_cache
if cache then
local server_id = cache:get(client_ip)
if server_id then
for _, server in ipairs(servers) do
local current_server_id = server.host .. ":" .. server.port
if current_server_id == server_id and server.status == "up" then
return server
end
end
end
end
-- 选择新的服务器并缓存
local backend_manager = require "backend_manager"
local server = backend_manager.get_server()
if server and cache then
local server_id = server.host .. ":" .. server.port
cache:set(client_ip, server_id, session_config.ttl)
end
return server
end
-- 获取会话保持的服务器
function session_persistence.get_persistent_server(servers)
if session_config.method == "cookie" then
return session_persistence.cookie_persistence(servers)
elseif session_config.method == "header" then
return session_persistence.header_persistence(servers)
elseif session_config.method == "ip" then
return session_persistence.ip_persistence(servers)
else
-- 默认不使用会话保持
local backend_manager = require "backend_manager"
return backend_manager.get_server()
end
end
-- 清理过期的会话
function session_persistence.cleanup_expired_sessions()
local cache = ngx.shared.session_cache
if not cache then
return
end
-- 获取所有键
local keys = cache:get_keys(1000)
local current_time = ngx.time()
local cleaned_count = 0
for _, key in ipairs(keys) do
local value, flags, stale = cache:get_stale(key)
if stale then
cache:delete(key)
cleaned_count = cleaned_count + 1
end
end
if cleaned_count > 0 then
ngx.log(ngx.INFO, "Cleaned ", cleaned_count, " expired sessions")
end
end
-- 启动会话清理定时器
function session_persistence.start_cleanup_timer()
local function cleanup_sessions(premature)
if premature then
return
end
session_persistence.cleanup_expired_sessions()
-- 重新设置定时器(每小时清理一次)
local ok, err = ngx.timer.at(3600, cleanup_sessions)
if not ok then
ngx.log(ngx.ERR, "Failed to create session cleanup timer: ", err)
end
end
local ok, err = ngx.timer.at(3600, cleanup_sessions)
if not ok then
ngx.log(ngx.ERR, "Failed to create initial session cleanup timer: ", err)
end
end
return session_persistence
4. 高可用架构设计
4.1 多层高可用架构
┌─────────────────────────────────────────────────────────┐
│ DNS负载均衡 │
│ (example.com → 多个IP) │
└─────────────────┬───────────────────────────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│ LB-1 │ │ LB-2 │ │ LB-3 │ ← OpenResty负载均衡器
│(主) │ │(备) │ │(备) │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
└─────────┬──┴────────────┘
│
┌─────────┼─────────┐
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│App-1 │ │App-2 │ │App-3 │ ← 应用服务器
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
└─────────┼─────────┘
│
┌─────────▼─────────┐
│ 数据库集群 │ ← 数据库高可用
│ (主从/集群) │
└───────────────────┘
4.2 Keepalived配置
# /etc/keepalived/keepalived.conf (主服务器)
vrrp_script chk_nginx {
script "/etc/keepalived/check_nginx.sh"
interval 2
weight -2
fall 3
rise 2
}
vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 51
priority 101
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
virtual_ipaddress {
192.168.1.100
}
track_script {
chk_nginx
}
notify_master "/etc/keepalived/notify_master.sh"
notify_backup "/etc/keepalived/notify_backup.sh"
notify_fault "/etc/keepalived/notify_fault.sh"
}
#!/bin/bash
# /etc/keepalived/check_nginx.sh
if [ $(ps -C nginx --no-header | wc -l) -eq 0 ]; then
systemctl start nginx
sleep 2
if [ $(ps -C nginx --no-header | wc -l) -eq 0 ]; then
exit 1
fi
fi
exit 0
4.3 故障转移配置
# 故障转移配置
upstream primary_backend {
server 192.168.1.10:8080 max_fails=2 fail_timeout=30s;
server 192.168.1.11:8080 max_fails=2 fail_timeout=30s;
server 192.168.1.12:8080 backup; # 备份服务器
}
upstream secondary_backend {
server 192.168.2.10:8080;
server 192.168.2.11:8080;
}
server {
listen 80;
server_name api.example.com;
location / {
# 主要后端
proxy_pass http://primary_backend;
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;
proxy_next_upstream_tries 3;
proxy_next_upstream_timeout 10s;
# 故障转移到备用后端
error_page 502 503 504 = @fallback;
}
location @fallback {
proxy_pass http://secondary_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 添加故障转移标识
add_header X-Served-By "fallback-backend" always;
}
}
5. 监控与管理
5.1 负载均衡监控API
-- 负载均衡监控API
local monitoring_api = {}
local cjson = require "cjson"
local backend_manager = require "backend_manager"
local health_checker = require "health_checker"
-- 获取负载均衡状态
function monitoring_api.get_lb_status()
local servers = backend_manager.get_servers_status()
local health_report = health_checker.get_health_report(servers)
local status = {
timestamp = ngx.time(),
load_balancer = {
algorithm = "weighted_round_robin",
total_servers = #servers,
healthy_servers = health_report.healthy_servers,
unhealthy_servers = health_report.unhealthy_servers
},
servers = servers
}
return status
end
-- 获取服务器详细信息
function monitoring_api.get_server_details(host, port)
local servers = backend_manager.get_servers_status()
for _, server in ipairs(servers) do
if server.host == host and server.port == tonumber(port) then
return server
end
end
return nil
end
-- 手动设置服务器状态
function monitoring_api.set_server_status(host, port, status)
-- 这里应该实现服务器状态的手动设置逻辑
-- 例如:临时下线服务器进行维护
local result = {
success = true,
message = string.format("Server %s:%s status set to %s", host, port, status)
}
return result
end
-- 获取负载均衡统计信息
function monitoring_api.get_lb_stats()
local cache = ngx.shared.backend_stats
if not cache then
return {error = "Stats cache not available"}
end
local stats = {
timestamp = ngx.time(),
total_requests = cache:get("total_requests") or 0,
successful_requests = cache:get("successful_requests") or 0,
failed_requests = cache:get("failed_requests") or 0,
avg_response_time = cache:get("avg_response_time") or 0
}
stats.success_rate = stats.total_requests > 0 and
(stats.successful_requests / stats.total_requests) or 0
return stats
end
-- 处理监控API请求
function monitoring_api.handle_request()
local method = ngx.var.request_method
local uri = ngx.var.uri
local args = ngx.req.get_uri_args()
ngx.req.read_body()
local body = ngx.req.get_body_data()
local response = {}
if method == "GET" then
if uri == "/api/lb/status" then
response = monitoring_api.get_lb_status()
elseif uri == "/api/lb/stats" then
response = monitoring_api.get_lb_stats()
elseif uri == "/api/lb/server" then
if args.host and args.port then
response = monitoring_api.get_server_details(args.host, args.port)
if not response then
ngx.status = 404
response = {error = "Server not found"}
end
else
ngx.status = 400
response = {error = "Missing host or port parameter"}
end
else
ngx.status = 404
response = {error = "API endpoint not found"}
end
elseif method == "POST" then
if uri == "/api/lb/server/status" then
if body then
local data = cjson.decode(body)
if data.host and data.port and data.status then
response = monitoring_api.set_server_status(data.host, data.port, data.status)
else
ngx.status = 400
response = {error = "Missing required parameters"}
end
else
ngx.status = 400
response = {error = "Request body required"}
end
else
ngx.status = 404
response = {error = "API endpoint not found"}
end
else
ngx.status = 405
response = {error = "Method not allowed"}
end
ngx.header.content_type = "application/json"
ngx.say(cjson.encode(response))
end
return monitoring_api
5.2 管理界面配置
# 管理界面配置
server {
listen 8080;
server_name admin.example.com;
# 访问控制
allow 192.168.1.0/24;
allow 10.0.0.0/8;
deny all;
# 基础认证
auth_basic "Load Balancer Admin";
auth_basic_user_file /etc/nginx/.htpasswd;
location / {
root /var/www/lb-admin;
index index.html;
}
# API接口
location /api/ {
content_by_lua_block {
local monitoring_api = require "monitoring_api"
monitoring_api.handle_request()
}
}
# WebSocket支持(实时监控)
location /ws {
proxy_pass http://127.0.0.1:8081;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
}
6. 性能优化
6.1 连接池优化
# 连接池优化配置
upstream optimized_backend {
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
# 连接池设置
keepalive 100; # 保持100个空闲连接
keepalive_requests 1000; # 每个连接最多处理1000个请求
keepalive_timeout 60s; # 空闲连接超时时间
}
server {
location / {
proxy_pass http://optimized_backend;
# HTTP/1.1支持
proxy_http_version 1.1;
proxy_set_header Connection "";
# 缓冲区优化
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
proxy_busy_buffers_size 8k;
# 超时优化
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
}
}
6.2 缓存优化
-- 缓存优化模块
local cache_optimizer = {}
local lrucache = require "resty.lrucache"
-- 创建LRU缓存
local server_cache = lrucache.new(1000) -- 缓存1000个条目
-- 缓存服务器选择结果
function cache_optimizer.get_cached_server(cache_key)
return server_cache:get(cache_key)
end
function cache_optimizer.set_cached_server(cache_key, server, ttl)
server_cache:set(cache_key, server, ttl or 60)
end
-- 智能缓存键生成
function cache_optimizer.generate_cache_key(algorithm, client_info)
if algorithm == "ip_hash" then
return "ip:" .. (client_info.ip or "unknown")
elseif algorithm == "consistent_hash" then
return "uri:" .. (client_info.uri or "unknown")
elseif algorithm == "session" then
return "session:" .. (client_info.session_id or "unknown")
else
return "default"
end
end
return cache_optimizer
总结
负载均衡与高可用是OpenResty应用架构的核心组件。通过合理的配置和实现,可以:
- 提高系统可用性:通过多服务器部署和故障转移
- 优化性能:通过智能负载分配和连接池
- 支持扩展:通过动态服务器管理和会话保持
- 保障稳定性:通过健康检查和监控告警
- 简化运维:通过自动化管理和可视化界面
关键最佳实践: - 选择合适的负载均衡算法 - 实施完善的健康检查机制 - 配置合理的故障转移策略 - 优化连接池和缓存设置 - 建立完善的监控体系