本章概述

本章将详细介绍OpenVPN服务端的配置与部署,包括服务端配置文件的详细设置、网络拓扑的选择、路由配置、用户管理以及服务的启动与管理。通过本章的学习,您将能够独立搭建和配置一个功能完整的OpenVPN服务端。

flowchart TD
    A[服务端配置] --> B[基础配置]
    A --> C[网络配置]
    A --> D[安全配置]
    A --> E[用户管理]
    
    B --> B1[监听端口]
    B --> B2[协议选择]
    B --> B3[设备类型]
    
    C --> C1[IP地址池]
    C --> C2[路由推送]
    C --> C3[DNS配置]
    
    D --> D1[证书配置]
    D --> D2[加密设置]
    D --> D3[认证方式]
    
    E --> E1[客户端管理]
    E --> E2[权限控制]
    E --> E3[连接限制]

4.1 服务端基础配置

4.1.1 配置文件结构

OpenVPN服务端配置文件通常命名为server.conf,包含以下主要部分:

#!/bin/bash
# 创建服务端配置文件
cat > /etc/openvpn/server/server.conf << 'EOF'
# OpenVPN服务端配置文件
# 基础配置部分

#============================================
# 基本设置
#============================================

# 监听端口(默认1194)
port 1194

# 协议类型(udp或tcp)
proto udp

# 设备类型(tun或tap)
dev tun

# CA根证书
ca /etc/openvpn/server/ca.crt

# 服务端证书
cert /etc/openvpn/server/server.crt

# 服务端私钥
key /etc/openvpn/server/server.key

# Diffie-Hellman参数文件
dh /etc/openvpn/server/dh.pem

#============================================
# 网络配置
#============================================

# VPN网络地址池
server 10.8.0.0 255.255.255.0

# 保持连接状态
keepalive 10 120

# 数据压缩
comp-lzo

# 允许客户端之间通信
client-to-client

# 持久化选项
persist-key
persist-tun

#============================================
# 安全配置
#============================================

# TLS认证密钥
tls-auth /etc/openvpn/server/ta.key 0

# 加密算法
cipher AES-256-GCM

# 认证算法
auth SHA256

# TLS版本限制
tls-version-min 1.2

#============================================
# 日志配置
#============================================

# 状态文件
status /var/log/openvpn/openvpn-status.log

# 日志文件
log-append /var/log/openvpn/openvpn.log

# 日志级别(0-9)
verb 3

# 静默重复消息
mute 20

EOF

echo "服务端配置文件创建完成"

4.1.2 网络拓扑配置

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenVPN网络拓扑配置管理器
支持不同的网络拓扑模式配置
"""

import os
import ipaddress
from typing import Dict, List, Optional

class NetworkTopologyManager:
    """网络拓扑配置管理器"""
    
    def __init__(self, config_dir: str = "/etc/openvpn/server"):
        self.config_dir = config_dir
        self.topology_configs = {
            'subnet': self._subnet_topology,
            'net30': self._net30_topology,
            'p2p': self._p2p_topology
        }
    
    def _subnet_topology(self, network: str, netmask: str) -> Dict[str, str]:
        """子网拓扑配置"""
        return {
            'topology': 'subnet',
            'server': f'{network} {netmask}',
            'description': '推荐用于大多数场景,支持客户端间通信'
        }
    
    def _net30_topology(self, network: str, netmask: str) -> Dict[str, str]:
        """Net30拓扑配置"""
        return {
            'topology': 'net30',
            'server': f'{network} {netmask}',
            'description': '传统模式,每个客户端使用/30子网'
        }
    
    def _p2p_topology(self, network: str, netmask: str) -> Dict[str, str]:
        """点对点拓扑配置"""
        return {
            'topology': 'p2p',
            'server': f'{network} {netmask}',
            'description': '点对点模式,适用于简单连接'
        }
    
    def generate_topology_config(self, 
                               topology_type: str,
                               network: str = "10.8.0.0",
                               netmask: str = "255.255.255.0") -> str:
        """生成网络拓扑配置"""
        if topology_type not in self.topology_configs:
            raise ValueError(f"不支持的拓扑类型: {topology_type}")
        
        config = self.topology_configs[topology_type](network, netmask)
        
        config_lines = [
            f"# 网络拓扑配置 - {config['description']}",
            f"topology {config['topology']}",
            f"server {config['server']}",
            ""
        ]
        
        return "\n".join(config_lines)
    
    def calculate_client_capacity(self, network: str, netmask: str, topology: str) -> int:
        """计算客户端容量"""
        try:
            net = ipaddress.IPv4Network(f"{network}/{netmask}", strict=False)
            
            if topology == 'subnet':
                # 子网模式:总地址数 - 网络地址 - 广播地址 - 服务器地址
                return net.num_addresses - 3
            elif topology == 'net30':
                # Net30模式:每4个地址分配给一个客户端
                return (net.num_addresses - 4) // 4
            elif topology == 'p2p':
                # 点对点模式:只支持一个客户端
                return 1
            else:
                return 0
        except Exception as e:
            print(f"计算客户端容量时出错: {e}")
            return 0
    
    def validate_network_config(self, network: str, netmask: str) -> bool:
        """验证网络配置"""
        try:
            ipaddress.IPv4Network(f"{network}/{netmask}", strict=False)
            return True
        except Exception:
            return False
    
    def generate_routing_config(self, routes: List[Dict[str, str]]) -> str:
        """生成路由配置"""
        config_lines = ["# 路由配置"]
        
        for route in routes:
            network = route.get('network')
            netmask = route.get('netmask')
            gateway = route.get('gateway', '')
            
            if network and netmask:
                if gateway:
                    config_lines.append(f"route {network} {netmask} {gateway}")
                else:
                    config_lines.append(f"route {network} {netmask}")
        
        config_lines.append("")
        return "\n".join(config_lines)
    
    def generate_push_config(self, push_options: List[str]) -> str:
        """生成推送配置"""
        config_lines = ["# 推送给客户端的配置"]
        
        for option in push_options:
            config_lines.append(f"push \"{option}\"")
        
        config_lines.append("")
        return "\n".join(config_lines)

# 使用示例
if __name__ == "__main__":
    manager = NetworkTopologyManager()
    
    # 生成子网拓扑配置
    subnet_config = manager.generate_topology_config('subnet', '10.8.0.0', '255.255.255.0')
    print("子网拓扑配置:")
    print(subnet_config)
    
    # 计算客户端容量
    capacity = manager.calculate_client_capacity('10.8.0.0', '255.255.255.0', 'subnet')
    print(f"客户端容量: {capacity}")
    
    # 生成路由配置
    routes = [
        {'network': '192.168.1.0', 'netmask': '255.255.255.0'},
        {'network': '172.16.0.0', 'netmask': '255.255.0.0'}
    ]
    routing_config = manager.generate_routing_config(routes)
    print("路由配置:")
    print(routing_config)
    
    # 生成推送配置
    push_options = [
        'redirect-gateway def1 bypass-dhcp',
        'dhcp-option DNS 8.8.8.8',
        'dhcp-option DNS 8.8.4.4'
    ]
    push_config = manager.generate_push_config(push_options)
    print("推送配置:")
    print(push_config)

4.2 高级网络配置

4.2.1 路由配置

#!/bin/bash
# OpenVPN路由配置脚本

# 配置服务端路由
configure_server_routing() {
    local config_file="/etc/openvpn/server/server.conf"
    
    echo "配置服务端路由..."
    
    # 添加路由配置到服务端配置文件
    cat >> "$config_file" << 'EOF'

#============================================
# 路由配置
#============================================

# 推送默认网关(全流量通过VPN)
push "redirect-gateway def1 bypass-dhcp"

# 推送DNS服务器
push "dhcp-option DNS 8.8.8.8"
push "dhcp-option DNS 8.8.4.4"

# 推送特定路由
push "route 192.168.1.0 255.255.255.0"
push "route 172.16.0.0 255.255.0.0"

# 服务端本地路由
route 10.8.0.0 255.255.255.0

# 客户端特定路由(通过客户端配置目录)
client-config-dir /etc/openvpn/server/ccd

EOF

    echo "路由配置完成"
}

# 创建客户端配置目录
create_client_config_dir() {
    local ccd_dir="/etc/openvpn/server/ccd"
    
    echo "创建客户端配置目录..."
    mkdir -p "$ccd_dir"
    
    # 示例:为特定客户端配置固定IP
    cat > "$ccd_dir/client1" << 'EOF'
# 客户端client1的特定配置
ifconfig-push 10.8.0.100 10.8.0.101

# 为此客户端推送特定路由
push "route 192.168.100.0 255.255.255.0"
EOF

    cat > "$ccd_dir/client2" << 'EOF'
# 客户端client2的特定配置
ifconfig-push 10.8.0.102 10.8.0.103

# 允许此客户端访问内网
iroute 192.168.2.0 255.255.255.0
EOF

    echo "客户端配置目录创建完成"
}

# 配置IP转发
enable_ip_forwarding() {
    echo "启用IP转发..."
    
    # 临时启用
    echo 1 > /proc/sys/net/ipv4/ip_forward
    
    # 永久启用
    if ! grep -q "net.ipv4.ip_forward=1" /etc/sysctl.conf; then
        echo "net.ipv4.ip_forward=1" >> /etc/sysctl.conf
    fi
    
    # 应用配置
    sysctl -p
    
    echo "IP转发已启用"
}

# 配置防火墙规则
configure_firewall() {
    echo "配置防火墙规则..."
    
    # 获取外网接口名称
    local external_interface=$(ip route | grep default | awk '{print $5}' | head -n1)
    
    if [ -z "$external_interface" ]; then
        echo "警告:无法自动检测外网接口,请手动配置"
        external_interface="eth0"
    fi
    
    echo "外网接口: $external_interface"
    
    # 配置iptables规则
    iptables -t nat -A POSTROUTING -s 10.8.0.0/24 -o "$external_interface" -j MASQUERADE
    iptables -A INPUT -i tun+ -j ACCEPT
    iptables -A FORWARD -i tun+ -j ACCEPT
    iptables -A FORWARD -i tun+ -o "$external_interface" -m state --state RELATED,ESTABLISHED -j ACCEPT
    iptables -A FORWARD -i "$external_interface" -o tun+ -m state --state RELATED,ESTABLISHED -j ACCEPT
    
    # 保存iptables规则
    if command -v iptables-save > /dev/null; then
        iptables-save > /etc/iptables/rules.v4
    fi
    
    echo "防火墙规则配置完成"
}

# 主函数
main() {
    echo "开始配置OpenVPN服务端路由..."
    
    configure_server_routing
    create_client_config_dir
    enable_ip_forwarding
    configure_firewall
    
    echo "OpenVPN服务端路由配置完成!"
}

# 执行主函数
main

4.2.2 DNS配置

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenVPN DNS配置管理器
管理DNS服务器配置和域名解析
"""

import os
import re
from typing import List, Dict, Optional

class DNSConfigManager:
    """DNS配置管理器"""
    
    def __init__(self, config_file: str = "/etc/openvpn/server/server.conf"):
        self.config_file = config_file
        self.dns_servers = {
            'google': ['8.8.8.8', '8.8.4.4'],
            'cloudflare': ['1.1.1.1', '1.0.0.1'],
            'opendns': ['208.67.222.222', '208.67.220.220'],
            'quad9': ['9.9.9.9', '149.112.112.112']
        }
    
    def add_dns_servers(self, provider: str = 'google', custom_servers: Optional[List[str]] = None) -> str:
        """添加DNS服务器配置"""
        if custom_servers:
            servers = custom_servers
        elif provider in self.dns_servers:
            servers = self.dns_servers[provider]
        else:
            raise ValueError(f"不支持的DNS提供商: {provider}")
        
        config_lines = ["# DNS服务器配置"]
        
        for server in servers:
            if self._validate_ip(server):
                config_lines.append(f'push "dhcp-option DNS {server}"')
            else:
                print(f"警告:无效的DNS服务器地址 {server}")
        
        config_lines.append("")
        return "\n".join(config_lines)
    
    def add_dns_domains(self, domains: List[str]) -> str:
        """添加DNS域名配置"""
        config_lines = ["# DNS域名配置"]
        
        for domain in domains:
            if self._validate_domain(domain):
                config_lines.append(f'push "dhcp-option DOMAIN {domain}"')
            else:
                print(f"警告:无效的域名 {domain}")
        
        config_lines.append("")
        return "\n".join(config_lines)
    
    def add_dns_search_domains(self, search_domains: List[str]) -> str:
        """添加DNS搜索域配置"""
        config_lines = ["# DNS搜索域配置"]
        
        for domain in search_domains:
            if self._validate_domain(domain):
                config_lines.append(f'push "dhcp-option DOMAIN-SEARCH {domain}"')
            else:
                print(f"警告:无效的搜索域 {domain}")
        
        config_lines.append("")
        return "\n".join(config_lines)
    
    def generate_dns_config(self, 
                          provider: str = 'google',
                          custom_servers: Optional[List[str]] = None,
                          domains: Optional[List[str]] = None,
                          search_domains: Optional[List[str]] = None) -> str:
        """生成完整的DNS配置"""
        config_parts = []
        
        # DNS服务器配置
        dns_config = self.add_dns_servers(provider, custom_servers)
        config_parts.append(dns_config)
        
        # 域名配置
        if domains:
            domain_config = self.add_dns_domains(domains)
            config_parts.append(domain_config)
        
        # 搜索域配置
        if search_domains:
            search_config = self.add_dns_search_domains(search_domains)
            config_parts.append(search_config)
        
        return "\n".join(config_parts)
    
    def _validate_ip(self, ip: str) -> bool:
        """验证IP地址格式"""
        pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
        return bool(re.match(pattern, ip))
    
    def _validate_domain(self, domain: str) -> bool:
        """验证域名格式"""
        pattern = r'^(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+'r'[a-zA-Z]{2,}$'
        return bool(re.match(pattern, domain)) and len(domain) <= 253
    
    def create_dns_test_script(self) -> str:
        """创建DNS测试脚本"""
        script_content = '''#!/bin/bash
# DNS配置测试脚本

echo "=== OpenVPN DNS配置测试 ==="

# 测试DNS解析
test_dns_resolution() {
    local test_domains=("google.com" "github.com" "stackoverflow.com")
    
    echo "测试DNS解析..."
    for domain in "${test_domains[@]}"; do
        if nslookup "$domain" > /dev/null 2>&1; then
            echo "✓ $domain 解析成功"
        else
            echo "✗ $domain 解析失败"
        fi
    done
}

# 检查DNS服务器连通性
test_dns_connectivity() {
    local dns_servers=("8.8.8.8" "1.1.1.1" "208.67.222.222")
    
    echo "\n测试DNS服务器连通性..."
    for server in "${dns_servers[@]}"; do
        if ping -c 1 -W 3 "$server" > /dev/null 2>&1; then
            echo "✓ $server 连通正常"
        else
            echo "✗ $server 连通失败"
        fi
    done
}

# 显示当前DNS配置
show_dns_config() {
    echo "\n当前DNS配置:"
    if [ -f /etc/resolv.conf ]; then
        cat /etc/resolv.conf
    else
        echo "未找到DNS配置文件"
    fi
}

# 执行测试
test_dns_resolution
test_dns_connectivity
show_dns_config

echo "\n=== DNS测试完成 ==="
'''
        return script_content
    
    def get_recommended_dns_config(self, use_case: str) -> Dict[str, any]:
        """获取推荐的DNS配置"""
        configs = {
            'general': {
                'provider': 'google',
                'description': '通用配置,适合大多数场景'
            },
            'privacy': {
                'provider': 'cloudflare',
                'description': '注重隐私保护的配置'
            },
            'security': {
                'provider': 'quad9',
                'description': '注重安全防护的配置'
            },
            'family': {
                'provider': 'opendns',
                'description': '家庭友好的配置,包含内容过滤'
            }
        }
        
        return configs.get(use_case, configs['general'])

# 使用示例
if __name__ == "__main__":
    dns_manager = DNSConfigManager()
    
    # 生成基本DNS配置
    basic_config = dns_manager.generate_dns_config('google')
    print("基本DNS配置:")
    print(basic_config)
    
    # 生成高级DNS配置
    advanced_config = dns_manager.generate_dns_config(
        provider='cloudflare',
        domains=['company.local'],
        search_domains=['internal.company.com', 'dev.company.com']
    )
    print("高级DNS配置:")
    print(advanced_config)
    
    # 创建DNS测试脚本
    test_script = dns_manager.create_dns_test_script()
    print("DNS测试脚本:")
    print(test_script[:200] + "...")
    
    # 获取推荐配置
    recommendation = dns_manager.get_recommended_dns_config('privacy')
    print(f"\n推荐配置: {recommendation}")

4.3 服务管理与启动

4.3.1 系统服务配置

#!/bin/bash
# OpenVPN服务管理脚本

# 创建systemd服务文件
create_systemd_service() {
    echo "创建OpenVPN systemd服务文件..."
    
    cat > /etc/systemd/system/openvpn-server@.service << 'EOF'
[Unit]
Description=OpenVPN service for %I
After=syslog.target network-online.target
Wants=network-online.target
Documentation=man:openvpn(8)
Documentation=https://community.openvpn.net/openvpn/wiki/Openvpn24ManPage
Documentation=https://community.openvpn.net/openvpn/wiki/HOWTO

[Service]
Type=notify
PrivateTmp=true
WorkingDirectory=/etc/openvpn/server
ExecStart=/usr/sbin/openvpn --status %t/openvpn-server/status-%i.log --status-version 2 --suppress-timestamps --config %i.conf
CapabilityBoundingSet=CAP_IPC_LOCK CAP_NET_ADMIN CAP_NET_BIND_SERVICE CAP_NET_RAW CAP_SETGID CAP_SETUID CAP_SYS_CHROOT CAP_DAC_OVERRIDE CAP_AUDIT_WRITE
LimitNPROC=100
DeviceAllow=/dev/null rw
DeviceAllow=/dev/net/tun rw
ProtectSystem=true
ProtectHome=true
KillMode=process
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target
EOF

    echo "systemd服务文件创建完成"
}

# 启动和启用服务
start_openvpn_service() {
    local config_name="server"
    
    echo "启动OpenVPN服务..."
    
    # 重新加载systemd配置
    systemctl daemon-reload
    
    # 启用服务(开机自启)
    systemctl enable openvpn-server@${config_name}.service
    
    # 启动服务
    systemctl start openvpn-server@${config_name}.service
    
    # 检查服务状态
    if systemctl is-active --quiet openvpn-server@${config_name}.service; then
        echo "✓ OpenVPN服务启动成功"
    else
        echo "✗ OpenVPN服务启动失败"
        systemctl status openvpn-server@${config_name}.service
        return 1
    fi
}

# 检查服务状态
check_service_status() {
    local config_name="server"
    
    echo "=== OpenVPN服务状态 ==="
    systemctl status openvpn-server@${config_name}.service
    
    echo "\n=== 服务日志 ==="
    journalctl -u openvpn-server@${config_name}.service --no-pager -n 20
    
    echo "\n=== 连接状态 ==="
    if [ -f /var/log/openvpn/openvpn-status.log ]; then
        cat /var/log/openvpn/openvpn-status.log
    else
        echo "状态文件不存在"
    fi
}

# 重启服务
restart_service() {
    local config_name="server"
    
    echo "重启OpenVPN服务..."
    systemctl restart openvpn-server@${config_name}.service
    
    sleep 3
    
    if systemctl is-active --quiet openvpn-server@${config_name}.service; then
        echo "✓ OpenVPN服务重启成功"
    else
        echo "✗ OpenVPN服务重启失败"
        return 1
    fi
}

# 停止服务
stop_service() {
    local config_name="server"
    
    echo "停止OpenVPN服务..."
    systemctl stop openvpn-server@${config_name}.service
    
    if ! systemctl is-active --quiet openvpn-server@${config_name}.service; then
        echo "✓ OpenVPN服务已停止"
    else
        echo "✗ OpenVPN服务停止失败"
        return 1
    fi
}

# 创建日志目录
create_log_directory() {
    echo "创建日志目录..."
    
    mkdir -p /var/log/openvpn
    mkdir -p /var/run/openvpn-server
    
    # 设置权限
    chown nobody:nogroup /var/log/openvpn
    chmod 755 /var/log/openvpn
    
    echo "日志目录创建完成"
}

# 配置日志轮转
configure_log_rotation() {
    echo "配置日志轮转..."
    
    cat > /etc/logrotate.d/openvpn << 'EOF'
/var/log/openvpn/*.log {
    daily
    missingok
    rotate 52
    compress
    delaycompress
    notifempty
    create 644 nobody nogroup
    postrotate
        systemctl reload openvpn-server@server.service > /dev/null 2>&1 || true
    endscript
}
EOF

    echo "日志轮转配置完成"
}

# 主函数
main() {
    case "$1" in
        "start")
            create_systemd_service
            create_log_directory
            configure_log_rotation
            start_openvpn_service
            ;;
        "stop")
            stop_service
            ;;
        "restart")
            restart_service
            ;;
        "status")
            check_service_status
            ;;
        "install")
            create_systemd_service
            create_log_directory
            configure_log_rotation
            echo "OpenVPN服务安装完成,使用 'start' 参数启动服务"
            ;;
        *)
            echo "用法: $0 {start|stop|restart|status|install}"
            echo "  start   - 启动OpenVPN服务"
            echo "  stop    - 停止OpenVPN服务"
            echo "  restart - 重启OpenVPN服务"
            echo "  status  - 查看服务状态"
            echo "  install - 安装服务配置"
            exit 1
            ;;
    esac
}

# 执行主函数
main "$@"

4.4 监控与日志

4.4.1 实时监控脚本

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenVPN服务端实时监控系统
监控连接状态、性能指标和系统资源
"""

import os
import time
import json
import psutil
import subprocess
from datetime import datetime
from typing import Dict, List, Optional

class OpenVPNMonitor:
    """OpenVPN监控器"""
    
    def __init__(self, 
                 status_file: str = "/var/log/openvpn/openvpn-status.log",
                 log_file: str = "/var/log/openvpn/openvpn.log"):
        self.status_file = status_file
        self.log_file = log_file
        self.monitoring = False
    
    def get_service_status(self) -> Dict[str, any]:
        """获取服务状态"""
        try:
            result = subprocess.run(
                ['systemctl', 'is-active', 'openvpn-server@server.service'],
                capture_output=True, text=True
            )
            
            is_active = result.stdout.strip() == 'active'
            
            if is_active:
                # 获取服务详细信息
                status_result = subprocess.run(
                    ['systemctl', 'status', 'openvpn-server@server.service'],
                    capture_output=True, text=True
                )
                
                return {
                    'active': True,
                    'status': 'running',
                    'details': status_result.stdout
                }
            else:
                return {
                    'active': False,
                    'status': 'stopped',
                    'details': None
                }
        except Exception as e:
            return {
                'active': False,
                'status': 'error',
                'error': str(e)
            }
    
    def parse_status_file(self) -> Dict[str, any]:
        """解析状态文件"""
        if not os.path.exists(self.status_file):
            return {'error': '状态文件不存在'}
        
        try:
            with open(self.status_file, 'r') as f:
                content = f.read()
            
            lines = content.strip().split('\n')
            
            clients = []
            routing_table = []
            
            section = None
            for line in lines:
                line = line.strip()
                
                if line.startswith('OpenVPN CLIENT LIST'):
                    section = 'clients'
                    continue
                elif line.startswith('ROUTING TABLE'):
                    section = 'routing'
                    continue
                elif line.startswith('GLOBAL STATS'):
                    section = 'stats'
                    continue
                elif line.startswith('END'):
                    section = None
                    continue
                
                if section == 'clients' and ',' in line:
                    parts = line.split(',')
                    if len(parts) >= 5:
                        clients.append({
                            'name': parts[0],
                            'real_address': parts[1],
                            'virtual_address': parts[2],
                            'bytes_received': int(parts[3]) if parts[3].isdigit() else 0,
                            'bytes_sent': int(parts[4]) if parts[4].isdigit() else 0,
                            'connected_since': parts[5] if len(parts) > 5 else 'Unknown'
                        })
                
                elif section == 'routing' and ',' in line:
                    parts = line.split(',')
                    if len(parts) >= 4:
                        routing_table.append({
                            'virtual_address': parts[0],
                            'common_name': parts[1],
                            'real_address': parts[2],
                            'last_ref': parts[3]
                        })
            
            return {
                'clients': clients,
                'routing_table': routing_table,
                'client_count': len(clients),
                'timestamp': datetime.now().isoformat()
            }
        
        except Exception as e:
            return {'error': f'解析状态文件失败: {str(e)}'}
    
    def get_system_metrics(self) -> Dict[str, any]:
        """获取系统指标"""
        try:
            # CPU使用率
            cpu_percent = psutil.cpu_percent(interval=1)
            
            # 内存使用情况
            memory = psutil.virtual_memory()
            
            # 磁盘使用情况
            disk = psutil.disk_usage('/')
            
            # 网络统计
            network = psutil.net_io_counters()
            
            # 负载平均值
            load_avg = os.getloadavg() if hasattr(os, 'getloadavg') else (0, 0, 0)
            
            return {
                'cpu': {
                    'percent': cpu_percent,
                    'count': psutil.cpu_count()
                },
                'memory': {
                    'total': memory.total,
                    'available': memory.available,
                    'percent': memory.percent,
                    'used': memory.used
                },
                'disk': {
                    'total': disk.total,
                    'used': disk.used,
                    'free': disk.free,
                    'percent': (disk.used / disk.total) * 100
                },
                'network': {
                    'bytes_sent': network.bytes_sent,
                    'bytes_recv': network.bytes_recv,
                    'packets_sent': network.packets_sent,
                    'packets_recv': network.packets_recv
                },
                'load_average': {
                    '1min': load_avg[0],
                    '5min': load_avg[1],
                    '15min': load_avg[2]
                },
                'timestamp': datetime.now().isoformat()
            }
        
        except Exception as e:
            return {'error': f'获取系统指标失败: {str(e)}'}
    
    def get_openvpn_process_info(self) -> Dict[str, any]:
        """获取OpenVPN进程信息"""
        try:
            for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent', 'memory_info']):
                if 'openvpn' in proc.info['name'].lower():
                    return {
                        'pid': proc.info['pid'],
                        'name': proc.info['name'],
                        'cmdline': ' '.join(proc.info['cmdline']),
                        'cpu_percent': proc.info['cpu_percent'],
                        'memory_mb': proc.info['memory_info'].rss / 1024 / 1024,
                        'status': proc.status()
                    }
            
            return {'error': 'OpenVPN进程未找到'}
        
        except Exception as e:
            return {'error': f'获取进程信息失败: {str(e)}'}
    
    def generate_report(self) -> Dict[str, any]:
        """生成监控报告"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'service_status': self.get_service_status(),
            'connection_status': self.parse_status_file(),
            'system_metrics': self.get_system_metrics(),
            'process_info': self.get_openvpn_process_info()
        }
        
        return report
    
    def start_monitoring(self, interval: int = 30, output_file: Optional[str] = None):
        """开始实时监控"""
        self.monitoring = True
        
        print(f"开始监控OpenVPN服务,间隔: {interval}秒")
        print("按 Ctrl+C 停止监控")
        
        try:
            while self.monitoring:
                report = self.generate_report()
                
                # 显示简要信息
                self._display_summary(report)
                
                # 保存详细报告
                if output_file:
                    self._save_report(report, output_file)
                
                time.sleep(interval)
        
        except KeyboardInterrupt:
            print("\n监控已停止")
            self.monitoring = False
    
    def _display_summary(self, report: Dict[str, any]):
        """显示监控摘要"""
        os.system('clear' if os.name == 'posix' else 'cls')
        
        print("=" * 60)
        print(f"OpenVPN 服务监控 - {report['timestamp']}")
        print("=" * 60)
        
        # 服务状态
        service = report['service_status']
        status_icon = "✓" if service.get('active') else "✗"
        print(f"服务状态: {status_icon} {service.get('status', 'unknown')}")
        
        # 连接信息
        connections = report['connection_status']
        if 'clients' in connections:
            print(f"在线客户端: {connections['client_count']}")
            for client in connections['clients'][:5]:  # 显示前5个客户端
                print(f"  - {client['name']} ({client['virtual_address']})")
        
        # 系统指标
        metrics = report['system_metrics']
        if 'cpu' in metrics:
            print(f"CPU使用率: {metrics['cpu']['percent']:.1f}%")
            print(f"内存使用率: {metrics['memory']['percent']:.1f}%")
            print(f"磁盘使用率: {metrics['disk']['percent']:.1f}%")
        
        # 进程信息
        process = report['process_info']
        if 'pid' in process:
            print(f"OpenVPN进程: PID {process['pid']}, 内存 {process['memory_mb']:.1f}MB")
        
        print("\n按 Ctrl+C 停止监控...")
    
    def _save_report(self, report: Dict[str, any], filename: str):
        """保存监控报告"""
        try:
            with open(filename, 'a') as f:
                f.write(json.dumps(report) + '\n')
        except Exception as e:
            print(f"保存报告失败: {e}")

# 使用示例
if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description='OpenVPN服务监控')
    parser.add_argument('--interval', type=int, default=30, help='监控间隔(秒)')
    parser.add_argument('--output', type=str, help='输出文件路径')
    parser.add_argument('--once', action='store_true', help='只运行一次')
    
    args = parser.parse_args()
    
    monitor = OpenVPNMonitor()
    
    if args.once:
        # 生成一次性报告
        report = monitor.generate_report()
        print(json.dumps(report, indent=2, ensure_ascii=False))
    else:
        # 开始实时监控
        monitor.start_monitoring(args.interval, args.output)

本章小结

本章详细介绍了OpenVPN服务端的配置与部署,包括:

  1. 基础配置:服务端配置文件的结构和关键参数设置
  2. 网络配置:不同网络拓扑的选择和路由配置
  3. DNS配置:DNS服务器和域名解析的配置管理
  4. 服务管理:systemd服务的创建、启动和管理
  5. 监控系统:实时监控连接状态和系统性能

通过本章的学习,您应该能够: - 独立配置OpenVPN服务端 - 选择合适的网络拓扑 - 配置路由和DNS - 管理OpenVPN服务 - 监控服务运行状态

下章预告

下一章将介绍客户端配置与连接,包括不同平台的客户端配置、连接脚本的编写以及连接问题的排查方法。