本章概述
本章将详细介绍OpenVPN服务端的配置与部署,包括服务端配置文件的详细设置、网络拓扑的选择、路由配置、用户管理以及服务的启动与管理。通过本章的学习,您将能够独立搭建和配置一个功能完整的OpenVPN服务端。
flowchart TD
A[服务端配置] --> B[基础配置]
A --> C[网络配置]
A --> D[安全配置]
A --> E[用户管理]
B --> B1[监听端口]
B --> B2[协议选择]
B --> B3[设备类型]
C --> C1[IP地址池]
C --> C2[路由推送]
C --> C3[DNS配置]
D --> D1[证书配置]
D --> D2[加密设置]
D --> D3[认证方式]
E --> E1[客户端管理]
E --> E2[权限控制]
E --> E3[连接限制]
4.1 服务端基础配置
4.1.1 配置文件结构
OpenVPN服务端配置文件通常命名为server.conf
,包含以下主要部分:
#!/bin/bash
# 创建服务端配置文件
cat > /etc/openvpn/server/server.conf << 'EOF'
# OpenVPN服务端配置文件
# 基础配置部分
#============================================
# 基本设置
#============================================
# 监听端口(默认1194)
port 1194
# 协议类型(udp或tcp)
proto udp
# 设备类型(tun或tap)
dev tun
# CA根证书
ca /etc/openvpn/server/ca.crt
# 服务端证书
cert /etc/openvpn/server/server.crt
# 服务端私钥
key /etc/openvpn/server/server.key
# Diffie-Hellman参数文件
dh /etc/openvpn/server/dh.pem
#============================================
# 网络配置
#============================================
# VPN网络地址池
server 10.8.0.0 255.255.255.0
# 保持连接状态
keepalive 10 120
# 数据压缩
comp-lzo
# 允许客户端之间通信
client-to-client
# 持久化选项
persist-key
persist-tun
#============================================
# 安全配置
#============================================
# TLS认证密钥
tls-auth /etc/openvpn/server/ta.key 0
# 加密算法
cipher AES-256-GCM
# 认证算法
auth SHA256
# TLS版本限制
tls-version-min 1.2
#============================================
# 日志配置
#============================================
# 状态文件
status /var/log/openvpn/openvpn-status.log
# 日志文件
log-append /var/log/openvpn/openvpn.log
# 日志级别(0-9)
verb 3
# 静默重复消息
mute 20
EOF
echo "服务端配置文件创建完成"
4.1.2 网络拓扑配置
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenVPN网络拓扑配置管理器
支持不同的网络拓扑模式配置
"""
import os
import ipaddress
from typing import Dict, List, Optional
class NetworkTopologyManager:
"""网络拓扑配置管理器"""
def __init__(self, config_dir: str = "/etc/openvpn/server"):
self.config_dir = config_dir
self.topology_configs = {
'subnet': self._subnet_topology,
'net30': self._net30_topology,
'p2p': self._p2p_topology
}
def _subnet_topology(self, network: str, netmask: str) -> Dict[str, str]:
"""子网拓扑配置"""
return {
'topology': 'subnet',
'server': f'{network} {netmask}',
'description': '推荐用于大多数场景,支持客户端间通信'
}
def _net30_topology(self, network: str, netmask: str) -> Dict[str, str]:
"""Net30拓扑配置"""
return {
'topology': 'net30',
'server': f'{network} {netmask}',
'description': '传统模式,每个客户端使用/30子网'
}
def _p2p_topology(self, network: str, netmask: str) -> Dict[str, str]:
"""点对点拓扑配置"""
return {
'topology': 'p2p',
'server': f'{network} {netmask}',
'description': '点对点模式,适用于简单连接'
}
def generate_topology_config(self,
topology_type: str,
network: str = "10.8.0.0",
netmask: str = "255.255.255.0") -> str:
"""生成网络拓扑配置"""
if topology_type not in self.topology_configs:
raise ValueError(f"不支持的拓扑类型: {topology_type}")
config = self.topology_configs[topology_type](network, netmask)
config_lines = [
f"# 网络拓扑配置 - {config['description']}",
f"topology {config['topology']}",
f"server {config['server']}",
""
]
return "\n".join(config_lines)
def calculate_client_capacity(self, network: str, netmask: str, topology: str) -> int:
"""计算客户端容量"""
try:
net = ipaddress.IPv4Network(f"{network}/{netmask}", strict=False)
if topology == 'subnet':
# 子网模式:总地址数 - 网络地址 - 广播地址 - 服务器地址
return net.num_addresses - 3
elif topology == 'net30':
# Net30模式:每4个地址分配给一个客户端
return (net.num_addresses - 4) // 4
elif topology == 'p2p':
# 点对点模式:只支持一个客户端
return 1
else:
return 0
except Exception as e:
print(f"计算客户端容量时出错: {e}")
return 0
def validate_network_config(self, network: str, netmask: str) -> bool:
"""验证网络配置"""
try:
ipaddress.IPv4Network(f"{network}/{netmask}", strict=False)
return True
except Exception:
return False
def generate_routing_config(self, routes: List[Dict[str, str]]) -> str:
"""生成路由配置"""
config_lines = ["# 路由配置"]
for route in routes:
network = route.get('network')
netmask = route.get('netmask')
gateway = route.get('gateway', '')
if network and netmask:
if gateway:
config_lines.append(f"route {network} {netmask} {gateway}")
else:
config_lines.append(f"route {network} {netmask}")
config_lines.append("")
return "\n".join(config_lines)
def generate_push_config(self, push_options: List[str]) -> str:
"""生成推送配置"""
config_lines = ["# 推送给客户端的配置"]
for option in push_options:
config_lines.append(f"push \"{option}\"")
config_lines.append("")
return "\n".join(config_lines)
# 使用示例
if __name__ == "__main__":
manager = NetworkTopologyManager()
# 生成子网拓扑配置
subnet_config = manager.generate_topology_config('subnet', '10.8.0.0', '255.255.255.0')
print("子网拓扑配置:")
print(subnet_config)
# 计算客户端容量
capacity = manager.calculate_client_capacity('10.8.0.0', '255.255.255.0', 'subnet')
print(f"客户端容量: {capacity}")
# 生成路由配置
routes = [
{'network': '192.168.1.0', 'netmask': '255.255.255.0'},
{'network': '172.16.0.0', 'netmask': '255.255.0.0'}
]
routing_config = manager.generate_routing_config(routes)
print("路由配置:")
print(routing_config)
# 生成推送配置
push_options = [
'redirect-gateway def1 bypass-dhcp',
'dhcp-option DNS 8.8.8.8',
'dhcp-option DNS 8.8.4.4'
]
push_config = manager.generate_push_config(push_options)
print("推送配置:")
print(push_config)
4.2 高级网络配置
4.2.1 路由配置
#!/bin/bash
# OpenVPN路由配置脚本
# 配置服务端路由
configure_server_routing() {
local config_file="/etc/openvpn/server/server.conf"
echo "配置服务端路由..."
# 添加路由配置到服务端配置文件
cat >> "$config_file" << 'EOF'
#============================================
# 路由配置
#============================================
# 推送默认网关(全流量通过VPN)
push "redirect-gateway def1 bypass-dhcp"
# 推送DNS服务器
push "dhcp-option DNS 8.8.8.8"
push "dhcp-option DNS 8.8.4.4"
# 推送特定路由
push "route 192.168.1.0 255.255.255.0"
push "route 172.16.0.0 255.255.0.0"
# 服务端本地路由
route 10.8.0.0 255.255.255.0
# 客户端特定路由(通过客户端配置目录)
client-config-dir /etc/openvpn/server/ccd
EOF
echo "路由配置完成"
}
# 创建客户端配置目录
create_client_config_dir() {
local ccd_dir="/etc/openvpn/server/ccd"
echo "创建客户端配置目录..."
mkdir -p "$ccd_dir"
# 示例:为特定客户端配置固定IP
cat > "$ccd_dir/client1" << 'EOF'
# 客户端client1的特定配置
ifconfig-push 10.8.0.100 10.8.0.101
# 为此客户端推送特定路由
push "route 192.168.100.0 255.255.255.0"
EOF
cat > "$ccd_dir/client2" << 'EOF'
# 客户端client2的特定配置
ifconfig-push 10.8.0.102 10.8.0.103
# 允许此客户端访问内网
iroute 192.168.2.0 255.255.255.0
EOF
echo "客户端配置目录创建完成"
}
# 配置IP转发
enable_ip_forwarding() {
echo "启用IP转发..."
# 临时启用
echo 1 > /proc/sys/net/ipv4/ip_forward
# 永久启用
if ! grep -q "net.ipv4.ip_forward=1" /etc/sysctl.conf; then
echo "net.ipv4.ip_forward=1" >> /etc/sysctl.conf
fi
# 应用配置
sysctl -p
echo "IP转发已启用"
}
# 配置防火墙规则
configure_firewall() {
echo "配置防火墙规则..."
# 获取外网接口名称
local external_interface=$(ip route | grep default | awk '{print $5}' | head -n1)
if [ -z "$external_interface" ]; then
echo "警告:无法自动检测外网接口,请手动配置"
external_interface="eth0"
fi
echo "外网接口: $external_interface"
# 配置iptables规则
iptables -t nat -A POSTROUTING -s 10.8.0.0/24 -o "$external_interface" -j MASQUERADE
iptables -A INPUT -i tun+ -j ACCEPT
iptables -A FORWARD -i tun+ -j ACCEPT
iptables -A FORWARD -i tun+ -o "$external_interface" -m state --state RELATED,ESTABLISHED -j ACCEPT
iptables -A FORWARD -i "$external_interface" -o tun+ -m state --state RELATED,ESTABLISHED -j ACCEPT
# 保存iptables规则
if command -v iptables-save > /dev/null; then
iptables-save > /etc/iptables/rules.v4
fi
echo "防火墙规则配置完成"
}
# 主函数
main() {
echo "开始配置OpenVPN服务端路由..."
configure_server_routing
create_client_config_dir
enable_ip_forwarding
configure_firewall
echo "OpenVPN服务端路由配置完成!"
}
# 执行主函数
main
4.2.2 DNS配置
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenVPN DNS配置管理器
管理DNS服务器配置和域名解析
"""
import os
import re
from typing import List, Dict, Optional
class DNSConfigManager:
"""DNS配置管理器"""
def __init__(self, config_file: str = "/etc/openvpn/server/server.conf"):
self.config_file = config_file
self.dns_servers = {
'google': ['8.8.8.8', '8.8.4.4'],
'cloudflare': ['1.1.1.1', '1.0.0.1'],
'opendns': ['208.67.222.222', '208.67.220.220'],
'quad9': ['9.9.9.9', '149.112.112.112']
}
def add_dns_servers(self, provider: str = 'google', custom_servers: Optional[List[str]] = None) -> str:
"""添加DNS服务器配置"""
if custom_servers:
servers = custom_servers
elif provider in self.dns_servers:
servers = self.dns_servers[provider]
else:
raise ValueError(f"不支持的DNS提供商: {provider}")
config_lines = ["# DNS服务器配置"]
for server in servers:
if self._validate_ip(server):
config_lines.append(f'push "dhcp-option DNS {server}"')
else:
print(f"警告:无效的DNS服务器地址 {server}")
config_lines.append("")
return "\n".join(config_lines)
def add_dns_domains(self, domains: List[str]) -> str:
"""添加DNS域名配置"""
config_lines = ["# DNS域名配置"]
for domain in domains:
if self._validate_domain(domain):
config_lines.append(f'push "dhcp-option DOMAIN {domain}"')
else:
print(f"警告:无效的域名 {domain}")
config_lines.append("")
return "\n".join(config_lines)
def add_dns_search_domains(self, search_domains: List[str]) -> str:
"""添加DNS搜索域配置"""
config_lines = ["# DNS搜索域配置"]
for domain in search_domains:
if self._validate_domain(domain):
config_lines.append(f'push "dhcp-option DOMAIN-SEARCH {domain}"')
else:
print(f"警告:无效的搜索域 {domain}")
config_lines.append("")
return "\n".join(config_lines)
def generate_dns_config(self,
provider: str = 'google',
custom_servers: Optional[List[str]] = None,
domains: Optional[List[str]] = None,
search_domains: Optional[List[str]] = None) -> str:
"""生成完整的DNS配置"""
config_parts = []
# DNS服务器配置
dns_config = self.add_dns_servers(provider, custom_servers)
config_parts.append(dns_config)
# 域名配置
if domains:
domain_config = self.add_dns_domains(domains)
config_parts.append(domain_config)
# 搜索域配置
if search_domains:
search_config = self.add_dns_search_domains(search_domains)
config_parts.append(search_config)
return "\n".join(config_parts)
def _validate_ip(self, ip: str) -> bool:
"""验证IP地址格式"""
pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
return bool(re.match(pattern, ip))
def _validate_domain(self, domain: str) -> bool:
"""验证域名格式"""
pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+'r'[a-zA-Z]{2,}$'
return bool(re.match(pattern, domain)) and len(domain) <= 253
def create_dns_test_script(self) -> str:
"""创建DNS测试脚本"""
script_content = '''#!/bin/bash
# DNS配置测试脚本
echo "=== OpenVPN DNS配置测试 ==="
# 测试DNS解析
test_dns_resolution() {
local test_domains=("google.com" "github.com" "stackoverflow.com")
echo "测试DNS解析..."
for domain in "${test_domains[@]}"; do
if nslookup "$domain" > /dev/null 2>&1; then
echo "✓ $domain 解析成功"
else
echo "✗ $domain 解析失败"
fi
done
}
# 检查DNS服务器连通性
test_dns_connectivity() {
local dns_servers=("8.8.8.8" "1.1.1.1" "208.67.222.222")
echo "\n测试DNS服务器连通性..."
for server in "${dns_servers[@]}"; do
if ping -c 1 -W 3 "$server" > /dev/null 2>&1; then
echo "✓ $server 连通正常"
else
echo "✗ $server 连通失败"
fi
done
}
# 显示当前DNS配置
show_dns_config() {
echo "\n当前DNS配置:"
if [ -f /etc/resolv.conf ]; then
cat /etc/resolv.conf
else
echo "未找到DNS配置文件"
fi
}
# 执行测试
test_dns_resolution
test_dns_connectivity
show_dns_config
echo "\n=== DNS测试完成 ==="
'''
return script_content
def get_recommended_dns_config(self, use_case: str) -> Dict[str, any]:
"""获取推荐的DNS配置"""
configs = {
'general': {
'provider': 'google',
'description': '通用配置,适合大多数场景'
},
'privacy': {
'provider': 'cloudflare',
'description': '注重隐私保护的配置'
},
'security': {
'provider': 'quad9',
'description': '注重安全防护的配置'
},
'family': {
'provider': 'opendns',
'description': '家庭友好的配置,包含内容过滤'
}
}
return configs.get(use_case, configs['general'])
# 使用示例
if __name__ == "__main__":
dns_manager = DNSConfigManager()
# 生成基本DNS配置
basic_config = dns_manager.generate_dns_config('google')
print("基本DNS配置:")
print(basic_config)
# 生成高级DNS配置
advanced_config = dns_manager.generate_dns_config(
provider='cloudflare',
domains=['company.local'],
search_domains=['internal.company.com', 'dev.company.com']
)
print("高级DNS配置:")
print(advanced_config)
# 创建DNS测试脚本
test_script = dns_manager.create_dns_test_script()
print("DNS测试脚本:")
print(test_script[:200] + "...")
# 获取推荐配置
recommendation = dns_manager.get_recommended_dns_config('privacy')
print(f"\n推荐配置: {recommendation}")
4.3 服务管理与启动
4.3.1 系统服务配置
#!/bin/bash
# OpenVPN服务管理脚本
# 创建systemd服务文件
create_systemd_service() {
echo "创建OpenVPN systemd服务文件..."
cat > /etc/systemd/system/openvpn-server@.service << 'EOF'
[Unit]
Description=OpenVPN service for %I
After=syslog.target network-online.target
Wants=network-online.target
Documentation=man:openvpn(8)
Documentation=https://community.openvpn.net/openvpn/wiki/Openvpn24ManPage
Documentation=https://community.openvpn.net/openvpn/wiki/HOWTO
[Service]
Type=notify
PrivateTmp=true
WorkingDirectory=/etc/openvpn/server
ExecStart=/usr/sbin/openvpn --status %t/openvpn-server/status-%i.log --status-version 2 --suppress-timestamps --config %i.conf
CapabilityBoundingSet=CAP_IPC_LOCK CAP_NET_ADMIN CAP_NET_BIND_SERVICE CAP_NET_RAW CAP_SETGID CAP_SETUID CAP_SYS_CHROOT CAP_DAC_OVERRIDE CAP_AUDIT_WRITE
LimitNPROC=100
DeviceAllow=/dev/null rw
DeviceAllow=/dev/net/tun rw
ProtectSystem=true
ProtectHome=true
KillMode=process
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
EOF
echo "systemd服务文件创建完成"
}
# 启动和启用服务
start_openvpn_service() {
local config_name="server"
echo "启动OpenVPN服务..."
# 重新加载systemd配置
systemctl daemon-reload
# 启用服务(开机自启)
systemctl enable openvpn-server@${config_name}.service
# 启动服务
systemctl start openvpn-server@${config_name}.service
# 检查服务状态
if systemctl is-active --quiet openvpn-server@${config_name}.service; then
echo "✓ OpenVPN服务启动成功"
else
echo "✗ OpenVPN服务启动失败"
systemctl status openvpn-server@${config_name}.service
return 1
fi
}
# 检查服务状态
check_service_status() {
local config_name="server"
echo "=== OpenVPN服务状态 ==="
systemctl status openvpn-server@${config_name}.service
echo "\n=== 服务日志 ==="
journalctl -u openvpn-server@${config_name}.service --no-pager -n 20
echo "\n=== 连接状态 ==="
if [ -f /var/log/openvpn/openvpn-status.log ]; then
cat /var/log/openvpn/openvpn-status.log
else
echo "状态文件不存在"
fi
}
# 重启服务
restart_service() {
local config_name="server"
echo "重启OpenVPN服务..."
systemctl restart openvpn-server@${config_name}.service
sleep 3
if systemctl is-active --quiet openvpn-server@${config_name}.service; then
echo "✓ OpenVPN服务重启成功"
else
echo "✗ OpenVPN服务重启失败"
return 1
fi
}
# 停止服务
stop_service() {
local config_name="server"
echo "停止OpenVPN服务..."
systemctl stop openvpn-server@${config_name}.service
if ! systemctl is-active --quiet openvpn-server@${config_name}.service; then
echo "✓ OpenVPN服务已停止"
else
echo "✗ OpenVPN服务停止失败"
return 1
fi
}
# 创建日志目录
create_log_directory() {
echo "创建日志目录..."
mkdir -p /var/log/openvpn
mkdir -p /var/run/openvpn-server
# 设置权限
chown nobody:nogroup /var/log/openvpn
chmod 755 /var/log/openvpn
echo "日志目录创建完成"
}
# 配置日志轮转
configure_log_rotation() {
echo "配置日志轮转..."
cat > /etc/logrotate.d/openvpn << 'EOF'
/var/log/openvpn/*.log {
daily
missingok
rotate 52
compress
delaycompress
notifempty
create 644 nobody nogroup
postrotate
systemctl reload openvpn-server@server.service > /dev/null 2>&1 || true
endscript
}
EOF
echo "日志轮转配置完成"
}
# 主函数
main() {
case "$1" in
"start")
create_systemd_service
create_log_directory
configure_log_rotation
start_openvpn_service
;;
"stop")
stop_service
;;
"restart")
restart_service
;;
"status")
check_service_status
;;
"install")
create_systemd_service
create_log_directory
configure_log_rotation
echo "OpenVPN服务安装完成,使用 'start' 参数启动服务"
;;
*)
echo "用法: $0 {start|stop|restart|status|install}"
echo " start - 启动OpenVPN服务"
echo " stop - 停止OpenVPN服务"
echo " restart - 重启OpenVPN服务"
echo " status - 查看服务状态"
echo " install - 安装服务配置"
exit 1
;;
esac
}
# 执行主函数
main "$@"
4.4 监控与日志
4.4.1 实时监控脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenVPN服务端实时监控系统
监控连接状态、性能指标和系统资源
"""
import os
import time
import json
import psutil
import subprocess
from datetime import datetime
from typing import Dict, List, Optional
class OpenVPNMonitor:
"""OpenVPN监控器"""
def __init__(self,
status_file: str = "/var/log/openvpn/openvpn-status.log",
log_file: str = "/var/log/openvpn/openvpn.log"):
self.status_file = status_file
self.log_file = log_file
self.monitoring = False
def get_service_status(self) -> Dict[str, any]:
"""获取服务状态"""
try:
result = subprocess.run(
['systemctl', 'is-active', 'openvpn-server@server.service'],
capture_output=True, text=True
)
is_active = result.stdout.strip() == 'active'
if is_active:
# 获取服务详细信息
status_result = subprocess.run(
['systemctl', 'status', 'openvpn-server@server.service'],
capture_output=True, text=True
)
return {
'active': True,
'status': 'running',
'details': status_result.stdout
}
else:
return {
'active': False,
'status': 'stopped',
'details': None
}
except Exception as e:
return {
'active': False,
'status': 'error',
'error': str(e)
}
def parse_status_file(self) -> Dict[str, any]:
"""解析状态文件"""
if not os.path.exists(self.status_file):
return {'error': '状态文件不存在'}
try:
with open(self.status_file, 'r') as f:
content = f.read()
lines = content.strip().split('\n')
clients = []
routing_table = []
section = None
for line in lines:
line = line.strip()
if line.startswith('OpenVPN CLIENT LIST'):
section = 'clients'
continue
elif line.startswith('ROUTING TABLE'):
section = 'routing'
continue
elif line.startswith('GLOBAL STATS'):
section = 'stats'
continue
elif line.startswith('END'):
section = None
continue
if section == 'clients' and ',' in line:
parts = line.split(',')
if len(parts) >= 5:
clients.append({
'name': parts[0],
'real_address': parts[1],
'virtual_address': parts[2],
'bytes_received': int(parts[3]) if parts[3].isdigit() else 0,
'bytes_sent': int(parts[4]) if parts[4].isdigit() else 0,
'connected_since': parts[5] if len(parts) > 5 else 'Unknown'
})
elif section == 'routing' and ',' in line:
parts = line.split(',')
if len(parts) >= 4:
routing_table.append({
'virtual_address': parts[0],
'common_name': parts[1],
'real_address': parts[2],
'last_ref': parts[3]
})
return {
'clients': clients,
'routing_table': routing_table,
'client_count': len(clients),
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {'error': f'解析状态文件失败: {str(e)}'}
def get_system_metrics(self) -> Dict[str, any]:
"""获取系统指标"""
try:
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存使用情况
memory = psutil.virtual_memory()
# 磁盘使用情况
disk = psutil.disk_usage('/')
# 网络统计
network = psutil.net_io_counters()
# 负载平均值
load_avg = os.getloadavg() if hasattr(os, 'getloadavg') else (0, 0, 0)
return {
'cpu': {
'percent': cpu_percent,
'count': psutil.cpu_count()
},
'memory': {
'total': memory.total,
'available': memory.available,
'percent': memory.percent,
'used': memory.used
},
'disk': {
'total': disk.total,
'used': disk.used,
'free': disk.free,
'percent': (disk.used / disk.total) * 100
},
'network': {
'bytes_sent': network.bytes_sent,
'bytes_recv': network.bytes_recv,
'packets_sent': network.packets_sent,
'packets_recv': network.packets_recv
},
'load_average': {
'1min': load_avg[0],
'5min': load_avg[1],
'15min': load_avg[2]
},
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {'error': f'获取系统指标失败: {str(e)}'}
def get_openvpn_process_info(self) -> Dict[str, any]:
"""获取OpenVPN进程信息"""
try:
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent', 'memory_info']):
if 'openvpn' in proc.info['name'].lower():
return {
'pid': proc.info['pid'],
'name': proc.info['name'],
'cmdline': ' '.join(proc.info['cmdline']),
'cpu_percent': proc.info['cpu_percent'],
'memory_mb': proc.info['memory_info'].rss / 1024 / 1024,
'status': proc.status()
}
return {'error': 'OpenVPN进程未找到'}
except Exception as e:
return {'error': f'获取进程信息失败: {str(e)}'}
def generate_report(self) -> Dict[str, any]:
"""生成监控报告"""
report = {
'timestamp': datetime.now().isoformat(),
'service_status': self.get_service_status(),
'connection_status': self.parse_status_file(),
'system_metrics': self.get_system_metrics(),
'process_info': self.get_openvpn_process_info()
}
return report
def start_monitoring(self, interval: int = 30, output_file: Optional[str] = None):
"""开始实时监控"""
self.monitoring = True
print(f"开始监控OpenVPN服务,间隔: {interval}秒")
print("按 Ctrl+C 停止监控")
try:
while self.monitoring:
report = self.generate_report()
# 显示简要信息
self._display_summary(report)
# 保存详细报告
if output_file:
self._save_report(report, output_file)
time.sleep(interval)
except KeyboardInterrupt:
print("\n监控已停止")
self.monitoring = False
def _display_summary(self, report: Dict[str, any]):
"""显示监控摘要"""
os.system('clear' if os.name == 'posix' else 'cls')
print("=" * 60)
print(f"OpenVPN 服务监控 - {report['timestamp']}")
print("=" * 60)
# 服务状态
service = report['service_status']
status_icon = "✓" if service.get('active') else "✗"
print(f"服务状态: {status_icon} {service.get('status', 'unknown')}")
# 连接信息
connections = report['connection_status']
if 'clients' in connections:
print(f"在线客户端: {connections['client_count']}")
for client in connections['clients'][:5]: # 显示前5个客户端
print(f" - {client['name']} ({client['virtual_address']})")
# 系统指标
metrics = report['system_metrics']
if 'cpu' in metrics:
print(f"CPU使用率: {metrics['cpu']['percent']:.1f}%")
print(f"内存使用率: {metrics['memory']['percent']:.1f}%")
print(f"磁盘使用率: {metrics['disk']['percent']:.1f}%")
# 进程信息
process = report['process_info']
if 'pid' in process:
print(f"OpenVPN进程: PID {process['pid']}, 内存 {process['memory_mb']:.1f}MB")
print("\n按 Ctrl+C 停止监控...")
def _save_report(self, report: Dict[str, any], filename: str):
"""保存监控报告"""
try:
with open(filename, 'a') as f:
f.write(json.dumps(report) + '\n')
except Exception as e:
print(f"保存报告失败: {e}")
# 使用示例
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='OpenVPN服务监控')
parser.add_argument('--interval', type=int, default=30, help='监控间隔(秒)')
parser.add_argument('--output', type=str, help='输出文件路径')
parser.add_argument('--once', action='store_true', help='只运行一次')
args = parser.parse_args()
monitor = OpenVPNMonitor()
if args.once:
# 生成一次性报告
report = monitor.generate_report()
print(json.dumps(report, indent=2, ensure_ascii=False))
else:
# 开始实时监控
monitor.start_monitoring(args.interval, args.output)
本章小结
本章详细介绍了OpenVPN服务端的配置与部署,包括:
- 基础配置:服务端配置文件的结构和关键参数设置
- 网络配置:不同网络拓扑的选择和路由配置
- DNS配置:DNS服务器和域名解析的配置管理
- 服务管理:systemd服务的创建、启动和管理
- 监控系统:实时监控连接状态和系统性能
通过本章的学习,您应该能够: - 独立配置OpenVPN服务端 - 选择合适的网络拓扑 - 配置路由和DNS - 管理OpenVPN服务 - 监控服务运行状态
下章预告
下一章将介绍客户端配置与连接,包括不同平台的客户端配置、连接脚本的编写以及连接问题的排查方法。