9.1 Ansible Vault 加密管理

9.1.1 Vault 基础概念

Ansible Vault 是 Ansible 内置的加密功能,用于保护敏感数据如密码、密钥、证书等。Vault 使用 AES256 加密算法,确保敏感信息的安全性。

Vault 的用途: - 加密密码和 API 密钥 - 保护 SSL 证书和私钥 - 加密配置文件中的敏感部分 - 安全地存储数据库连接信息 - 保护云服务凭证

9.1.2 Vault 基本操作

# 创建加密文件
ansible-vault create secrets.yml

# 编辑加密文件
ansible-vault edit secrets.yml

# 查看加密文件内容
ansible-vault view secrets.yml

# 加密现有文件
ansible-vault encrypt plaintext.yml

# 解密文件
ansible-vault decrypt secrets.yml

# 重新设置密码
ansible-vault rekey secrets.yml

# 加密字符串
ansible-vault encrypt_string 'secret_password' --name 'db_password'

# 使用密码文件
ansible-vault create --vault-password-file ~/.vault_pass secrets.yml

9.1.3 Vault 文件示例

# group_vars/production/vault.yml
# 生产环境加密变量
$ANSIBLE_VAULT;1.1;AES256
66386439653162336464643965393835663365356332643732393064663533343061613431313265
6664373764613963653965383138316364616533373734360a653638643435666633633964366235
63386435626139623965353965653266306164626565653637343334376165323030643936653030
3438626666666137650a353638643435666633633964366235633864356261396239653539653532
66306164626565653637343334376165323030643936653030343862666666613765

# 解密后的内容:
---
vault_mysql_root_password: super_secret_password
vault_api_key: abc123def456ghi789
vault_ssl_private_key: |
  -----BEGIN PRIVATE KEY-----
  MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC7...
  -----END PRIVATE KEY-----
vault_database_url: postgresql://user:pass@db.example.com:5432/myapp
# group_vars/production/vars.yml
# 引用加密变量的普通变量文件
---
# 数据库配置
mysql_root_password: "{{ vault_mysql_root_password }}"
api_key: "{{ vault_api_key }}"
ssl_private_key: "{{ vault_ssl_private_key }}"
database_url: "{{ vault_database_url }}"

# 其他非敏感配置
mysql_port: 3306
mysql_host: localhost
api_endpoint: https://api.example.com

9.1.4 多密码文件管理

# 创建不同环境的密码文件
echo "production_vault_password" > ~/.vault_pass_prod
echo "staging_vault_password" > ~/.vault_pass_staging
echo "development_vault_password" > ~/.vault_pass_dev

# 设置文件权限
chmod 600 ~/.vault_pass_*

# 使用不同的密码文件
ansible-playbook -i inventory/production site.yml --vault-password-file ~/.vault_pass_prod
ansible-playbook -i inventory/staging site.yml --vault-password-file ~/.vault_pass_staging
#!/usr/bin/env python3
# scripts/vault_password.py
# 动态密码脚本

import os
import sys
import subprocess

def get_vault_password():
    """从密码管理器获取 Vault 密码"""
    environment = os.environ.get('ANSIBLE_ENVIRONMENT', 'development')
    
    try:
        # 从 pass 密码管理器获取密码
        result = subprocess.run(
            ['pass', f'ansible/vault/{environment}'],
            capture_output=True,
            text=True,
            check=True
        )
        return result.stdout.strip()
    except subprocess.CalledProcessError:
        # 备用方案:从环境变量获取
        password = os.environ.get(f'ANSIBLE_VAULT_PASSWORD_{environment.upper()}')
        if password:
            return password
        
        # 最后备用方案:提示用户输入
        import getpass
        return getpass.getpass(f'Vault password for {environment}: ')

if __name__ == '__main__':
    print(get_vault_password())
# 使用动态密码脚本
ansible-playbook site.yml --vault-password-file scripts/vault_password.py

# 设置环境变量
export ANSIBLE_ENVIRONMENT=production
ansible-playbook site.yml --vault-password-file scripts/vault_password.py

9.1.5 内联加密变量

# playbooks/deploy.yml
# 在 Playbook 中使用内联加密变量
---
- name: Deploy application with encrypted secrets
  hosts: webservers
  vars:
    # 内联加密的数据库密码
    db_password: !vault |
      $ANSIBLE_VAULT;1.1;AES256
      66386439653162336464643965393835663365356332643732393064663533343061613431313265
      6664373764613963653965383138316364616533373734360a653638643435666633633964366235
      63386435626139623965353965653266306164626565653637343334376165323030643936653030
      3438626666666137650a353638643435666633633964366235633864356261396239653539653532
      66306164626565653637343334376165323030643936653030343862666666613765
    
    # 内联加密的 API 密钥
    api_secret: !vault |
      $ANSIBLE_VAULT;1.1;AES256
      33663365356332643732393064663533343061613431313265366437376461396365396538313831
      6364616533373734360a653638643435666633633964366235633864356261396239653539653532
      66306164626565653637343334376165323030643936653030343862666666613765
  
  tasks:
    - name: Configure database connection
      template:
        src: database.conf.j2
        dest: /etc/myapp/database.conf
        mode: '0600'
      vars:
        database_password: "{{ db_password }}"
    
    - name: Set API configuration
      lineinfile:
        path: /etc/myapp/api.conf
        regexp: '^api_secret='
        line: "api_secret={{ api_secret }}"
        mode: '0600'

9.2 动态包含和导入

9.2.1 include vs import 的区别

# 静态导入(编译时处理)
- import_tasks: setup.yml
- import_playbook: common.yml
- import_role: nginx

# 动态包含(运行时处理)
- include_tasks: "{{ ansible_os_family }}.yml"
- include_playbook: "{{ environment }}.yml"
- include_role:
    name: "{{ web_server_type }}"

9.2.2 动态任务包含

# playbooks/main.yml
# 主 Playbook
---
- name: Dynamic task inclusion example
  hosts: all
  vars:
    os_tasks:
      RedHat: redhat_tasks.yml
      Debian: debian_tasks.yml
      Windows: windows_tasks.yml
  
  tasks:
    - name: Include OS-specific tasks
      include_tasks: "{{ os_tasks[ansible_os_family] }}"
      when: ansible_os_family in os_tasks
    
    - name: Include tasks based on server role
      include_tasks: "roles/{{ server_role }}/tasks/main.yml"
      when: server_role is defined
    
    - name: Include environment-specific tasks
      include_tasks: "tasks/{{ environment }}/{{ item }}.yml"
      loop:
        - security
        - monitoring
        - backup
      when: environment in ['production', 'staging']
# tasks/redhat_tasks.yml
# RedHat 系列系统特定任务
---
- name: Install EPEL repository
  yum:
    name: epel-release
    state: present

- name: Update all packages
  yum:
    name: '*'
    state: latest
  when: update_packages | default(false)

- name: Install RedHat specific packages
  yum:
    name: "{{ redhat_packages }}"
    state: present
  vars:
    redhat_packages:
      - firewalld
      - policycoreutils-python-utils
      - selinux-policy-targeted

- name: Configure SELinux
  selinux:
    policy: targeted
    state: "{{ selinux_state | default('enforcing') }}"
  notify: reboot system

- name: Start and enable firewalld
  systemd:
    name: firewalld
    state: started
    enabled: yes
# tasks/debian_tasks.yml
# Debian 系列系统特定任务
---
- name: Update package cache
  apt:
    update_cache: yes
    cache_valid_time: 3600

- name: Upgrade all packages
  apt:
    upgrade: dist
  when: update_packages | default(false)

- name: Install Debian specific packages
  apt:
    name: "{{ debian_packages }}"
    state: present
  vars:
    debian_packages:
      - ufw
      - apparmor
      - apparmor-utils

- name: Configure UFW firewall
  ufw:
    state: enabled
    policy: deny
    direction: incoming

- name: Allow SSH through firewall
  ufw:
    rule: allow
    port: '22'
    proto: tcp

9.2.3 条件性角色包含

# playbooks/infrastructure.yml
# 基础设施部署 Playbook
---
- name: Deploy infrastructure components
  hosts: all
  vars:
    # 服务组件映射
    service_roles:
      web: nginx
      api: nodejs
      database: mysql
      cache: redis
      queue: rabbitmq
      monitoring: prometheus
    
    # 环境特定的角色
    environment_roles:
      production:
        - security_hardening
        - log_shipping
        - backup
      staging:
        - development_tools
      development:
        - debug_tools
        - test_data
  
  tasks:
    - name: Include base system role
      include_role:
        name: common
      tags: [common, base]
    
    - name: Include service-specific roles
      include_role:
        name: "{{ service_roles[item] }}"
      loop: "{{ server_services | default([]) }}"
      when: item in service_roles
      tags: [services]
    
    - name: Include environment-specific roles
      include_role:
        name: "{{ item }}"
      loop: "{{ environment_roles[environment] | default([]) }}"
      when: environment in environment_roles
      tags: [environment]
    
    - name: Include custom roles
      include_role:
        name: "{{ item.name }}"
        vars: "{{ item.vars | default({}) }}"
      loop: "{{ custom_roles | default([]) }}"
      tags: [custom]

9.2.4 动态 Playbook 包含

# playbooks/site.yml
# 主站点 Playbook
---
# 基础系统配置
- import_playbook: common.yml

# 根据环境包含不同的 Playbook
- include_playbook: "{{ environment }}.yml"
  when: environment is defined

# 根据主机组包含特定 Playbook
- include_playbook: webservers.yml
  when: "'webservers' in group_names"

- include_playbook: databases.yml
  when: "'databases' in group_names"

- include_playbook: loadbalancers.yml
  when: "'loadbalancers' in group_names"
# playbooks/production.yml
# 生产环境特定 Playbook
---
- name: Production environment setup
  hosts: all
  serial: "{{ rolling_update_batch_size | default(1) }}"
  max_fail_percentage: 10
  
  pre_tasks:
    - name: Check system requirements
      include_tasks: tasks/production/pre_checks.yml
      tags: [pre_checks]
  
  roles:
    - role: security_hardening
      tags: [security]
    
    - role: monitoring
      vars:
        monitoring_environment: production
        alert_email: ops@example.com
      tags: [monitoring]
    
    - role: backup
      vars:
        backup_schedule: "0 2 * * *"
        backup_retention: 30
      tags: [backup]
  
  post_tasks:
    - name: Validate deployment
      include_tasks: tasks/production/post_checks.yml
      tags: [post_checks]
    
    - name: Send deployment notification
      include_tasks: tasks/notifications.yml
      vars:
        notification_type: deployment_complete
      tags: [notifications]

9.3 错误处理和调试

9.3.1 错误处理策略

# playbooks/error_handling.yml
# 错误处理示例
---
- name: Error handling examples
  hosts: webservers
  vars:
    max_retries: 3
    retry_delay: 5
  
  tasks:
    # 1. 忽略错误继续执行
    - name: Try to stop service (ignore if not running)
      service:
        name: nginx
        state: stopped
      ignore_errors: yes
      tags: [service]
    
    # 2. 自定义失败条件
    - name: Check disk space
      shell: df -h / | tail -1 | awk '{print $5}' | sed 's/%//'
      register: disk_usage
      failed_when: disk_usage.stdout | int > 90
      changed_when: false
      tags: [disk_check]
    
    # 3. 重试机制
    - name: Download file with retries
      get_url:
        url: "{{ download_url }}"
        dest: "/tmp/{{ download_file }}"
        timeout: 30
      register: download_result
      retries: "{{ max_retries }}"
      delay: "{{ retry_delay }}"
      until: download_result is succeeded
      tags: [download]
    
    # 4. 块级错误处理
    - name: Database operations with error handling
      block:
        - name: Create database backup
          mysql_db:
            name: "{{ db_name }}"
            state: dump
            target: "/backup/{{ db_name }}_{{ ansible_date_time.epoch }}.sql"
        
        - name: Update database schema
          mysql_db:
            name: "{{ db_name }}"
            state: import
            target: "/tmp/schema_update.sql"
        
        - name: Verify database integrity
          mysql_query:
            login_db: "{{ db_name }}"
            query: "CHECK TABLE users, products, orders"
          register: integrity_check
          failed_when: "'error' in integrity_check.query_result[0] | lower"
      
      rescue:
        - name: Restore database from backup
          mysql_db:
            name: "{{ db_name }}"
            state: import
            target: "/backup/{{ db_name }}_{{ ansible_date_time.epoch }}.sql"
        
        - name: Send alert notification
          mail:
            to: dba@example.com
            subject: "Database update failed on {{ inventory_hostname }}"
            body: "Database schema update failed and was rolled back."
        
        - name: Fail the playbook
          fail:
            msg: "Database update failed and was rolled back"
      
      always:
        - name: Clean up temporary files
          file:
            path: "/tmp/schema_update.sql"
            state: absent
      
      tags: [database]
    
    # 5. 条件性失败
    - name: Check service health
      uri:
        url: "http://{{ inventory_hostname }}:{{ app_port }}/health"
        method: GET
        status_code: 200
      register: health_check
      failed_when: false  # 不让这个任务失败
      tags: [health_check]
    
    - name: Fail if health check failed
      fail:
        msg: "Service health check failed: {{ health_check.msg | default('Unknown error') }}"
      when: health_check.status != 200
      tags: [health_check]

9.3.2 调试技巧

# playbooks/debugging.yml
# 调试技巧示例
---
- name: Debugging techniques
  hosts: localhost
  vars:
    debug_mode: "{{ ansible_verbosity >= 2 }}"
    app_config:
      database:
        host: db.example.com
        port: 5432
        name: myapp
      cache:
        enabled: true
        ttl: 3600
  
  tasks:
    # 1. 调试变量内容
    - name: Debug variable contents
      debug:
        var: app_config
      when: debug_mode
      tags: [debug]
    
    - name: Debug with custom message
      debug:
        msg: "Processing {{ inventory_hostname }} in {{ environment }} environment"
      when: debug_mode
      tags: [debug]
    
    # 2. 条件性调试
    - name: Debug only when condition is met
      debug:
        msg: "Warning: Running in development mode"
      when: 
        - environment == 'development'
        - debug_mode
      tags: [debug]
    
    # 3. 调试注册变量
    - name: Get system information
      setup:
        gather_subset:
          - hardware
          - network
      register: system_facts
      tags: [facts]
    
    - name: Debug system facts
      debug:
        var: system_facts.ansible_facts.ansible_memtotal_mb
      when: debug_mode
      tags: [debug, facts]
    
    # 4. 使用 assert 进行验证
    - name: Validate configuration
      assert:
        that:
          - app_config.database.host is defined
          - app_config.database.port | int > 0
          - app_config.database.name | length > 0
        fail_msg: "Invalid database configuration"
        success_msg: "Database configuration is valid"
      tags: [validation]
    
    # 5. 暂停执行进行调试
    - name: Pause for manual inspection
      pause:
        prompt: "Press enter to continue or Ctrl+C to abort"
      when: 
        - debug_mode
        - manual_pause | default(false)
      tags: [debug, pause]
    
    # 6. 调试模板渲染
    - name: Debug template rendering
      debug:
        msg: "{{ lookup('template', 'config.j2') }}"
      when: debug_mode
      tags: [debug, template]
    
    # 7. 调试循环变量
    - name: Debug loop variables
      debug:
        msg: "Processing item {{ item }} (index: {{ ansible_loop.index }})"
      loop:
        - web1
        - web2
        - web3
      when: debug_mode
      tags: [debug, loop]

9.3.3 日志和审计

# ansible.cfg
# 日志配置
[defaults]
# 启用日志记录
log_path = /var/log/ansible.log

# 记录详细信息
verbosity = 1

# 显示任务执行时间
callback_whitelist = timer, profile_tasks

# 记录主机密钥检查
host_key_checking = True

[callback_profile_tasks]
# 性能分析配置
task_output_limit = 20
sort_order = descending
#!/usr/bin/env python3
# callback_plugins/audit_log.py
# 自定义审计日志插件

from ansible.plugins.callback import CallbackBase
import json
import datetime
import os

class CallbackModule(CallbackBase):
    """
    审计日志回调插件
    记录所有任务执行的详细信息
    """
    
    CALLBACK_VERSION = 2.0
    CALLBACK_TYPE = 'notification'
    CALLBACK_NAME = 'audit_log'
    
    def __init__(self):
        super(CallbackModule, self).__init__()
        self.audit_log_path = os.environ.get('ANSIBLE_AUDIT_LOG', '/var/log/ansible-audit.log')
        self.start_time = datetime.datetime.now()
    
    def _write_audit_log(self, event_type, data):
        """写入审计日志"""
        audit_entry = {
            'timestamp': datetime.datetime.now().isoformat(),
            'event_type': event_type,
            'playbook': getattr(self, '_playbook_name', 'unknown'),
            'user': os.environ.get('USER', 'unknown'),
            'data': data
        }
        
        try:
            with open(self.audit_log_path, 'a') as f:
                f.write(json.dumps(audit_entry) + '\n')
        except Exception as e:
            self._display.warning(f"Failed to write audit log: {e}")
    
    def v2_playbook_on_start(self, playbook):
        """Playbook 开始执行"""
        self._playbook_name = os.path.basename(playbook._file_name)
        self._write_audit_log('playbook_start', {
            'playbook': self._playbook_name,
            'file_path': playbook._file_name
        })
    
    def v2_playbook_on_task_start(self, task, is_conditional):
        """任务开始执行"""
        self._write_audit_log('task_start', {
            'task_name': task.get_name(),
            'task_action': task.action,
            'is_conditional': is_conditional
        })
    
    def v2_runner_on_ok(self, result):
        """任务执行成功"""
        self._write_audit_log('task_ok', {
            'host': result._host.get_name(),
            'task_name': result._task.get_name(),
            'changed': result._result.get('changed', False),
            'result': result._result
        })
    
    def v2_runner_on_failed(self, result, ignore_errors=False):
        """任务执行失败"""
        self._write_audit_log('task_failed', {
            'host': result._host.get_name(),
            'task_name': result._task.get_name(),
            'error': result._result.get('msg', 'Unknown error'),
            'ignore_errors': ignore_errors,
            'result': result._result
        })
    
    def v2_playbook_on_stats(self, stats):
        """Playbook 执行完成"""
        end_time = datetime.datetime.now()
        duration = (end_time - self.start_time).total_seconds()
        
        summary = {}
        for host in stats.processed:
            summary[host] = {
                'ok': stats.ok.get(host, 0),
                'changed': stats.changed.get(host, 0),
                'unreachable': stats.dark.get(host, 0),
                'failed': stats.failures.get(host, 0),
                'skipped': stats.skipped.get(host, 0)
            }
        
        self._write_audit_log('playbook_end', {
            'duration_seconds': duration,
            'summary': summary
        })

9.4 性能优化

9.4.1 并行执行优化

# playbooks/performance.yml
# 性能优化示例
---
- name: Performance optimization examples
  hosts: webservers
  strategy: free  # 使用 free 策略提高并行度
  serial: 5       # 每批处理 5 台主机
  max_fail_percentage: 20  # 允许 20% 的主机失败
  
  vars:
    # 优化连接设置
    ansible_ssh_pipelining: true
    ansible_ssh_multiplexing: true
    
  tasks:
    # 1. 禁用不必要的事实收集
    - name: Skip fact gathering for simple tasks
      setup:
        gather_subset:
          - '!all'
          - '!any'
          - network
      when: minimal_facts | default(false)
      tags: [facts]
    
    # 2. 使用异步任务处理长时间运行的操作
    - name: Long running task (async)
      command: /usr/bin/long_running_script.sh
      async: 300  # 最大运行时间 5 分钟
      poll: 0     # 不等待完成
      register: long_task
      tags: [async]
    
    # 3. 批量操作而不是循环
    - name: Install multiple packages at once
      package:
        name: "{{ packages }}"
        state: present
      vars:
        packages:
          - nginx
          - mysql-server
          - redis-server
          - nodejs
      tags: [packages]
    
    # 4. 使用 changed_when 避免不必要的变更
    - name: Check service status
      command: systemctl is-active nginx
      register: nginx_status
      changed_when: false
      failed_when: nginx_status.rc not in [0, 3]
      tags: [status]
    
    # 5. 条件性任务执行
    - name: Configure firewall (only if needed)
      firewalld:
        service: http
        permanent: yes
        state: enabled
      when: 
        - firewall_enabled | default(true)
        - ansible_os_family == 'RedHat'
      tags: [firewall]
    
    # 6. 检查异步任务状态
    - name: Wait for long running task
      async_status:
        jid: "{{ long_task.ansible_job_id }}"
      register: job_result
      until: job_result.finished
      retries: 30
      delay: 10
      when: long_task.ansible_job_id is defined
      tags: [async]

9.4.2 连接优化

# ansible.cfg
# 连接优化配置
[defaults]
# SSH 连接优化
host_key_checking = False
ssh_args = -o ControlMaster=auto -o ControlPersist=60s -o UserKnownHostsFile=/dev/null -o IdentitiesOnly=yes

# 启用 SSH 管道
pipelining = True

# 增加并发连接数
forks = 20

# 连接超时设置
timeout = 30

# 禁用 cowsay
nocows = 1

# 事实缓存
gathering = smart
fact_caching = jsonfile
fact_caching_connection = /tmp/ansible_facts_cache
fact_caching_timeout = 86400

[ssh_connection]
# SSH 连接复用
control_path_dir = ~/.ansible/cp
control_path = %(directory)s/%%h-%%p-%%r

# SSH 传输优化
transfer_method = smart
retries = 3

9.4.3 内存和 CPU 优化

# playbooks/resource_optimization.yml
# 资源优化示例
---
- name: Resource optimization
  hosts: all
  vars:
    # 限制同时处理的主机数量
    batch_size: "{{ (ansible_play_hosts | length / 4) | round(0, 'ceil') | int }}"
  
  serial: "{{ batch_size }}"
  
  tasks:
    # 1. 使用 lineinfile 而不是 template(对于简单修改)
    - name: Update configuration (efficient)
      lineinfile:
        path: /etc/myapp/config.conf
        regexp: '^max_connections='
        line: "max_connections={{ max_connections | default(100) }}"
      tags: [config]
    
    # 2. 避免在循环中使用 template
    - name: Generate config files efficiently
      template:
        src: "{{ item.src }}"
        dest: "{{ item.dest }}"
      loop:
        - { src: 'nginx.conf.j2', dest: '/etc/nginx/nginx.conf' }
        - { src: 'php.ini.j2', dest: '/etc/php/7.4/fpm/php.ini' }
      tags: [templates]
    
    # 3. 使用 copy 模块的 content 参数(小文件)
    - name: Create small config file
      copy:
        content: |
          # Generated by Ansible
          server_name={{ inventory_hostname }}
          environment={{ environment }}
          debug={{ debug_mode | default(false) | lower }}
        dest: /etc/myapp/server.conf
        mode: '0644'
      tags: [config]
    
    # 4. 批量文件操作
    - name: Create multiple directories
      file:
        path: "{{ item }}"
        state: directory
        mode: '0755'
      loop:
        - /var/log/myapp
        - /var/lib/myapp
        - /etc/myapp/conf.d
        - /opt/myapp/bin
      tags: [directories]
    
    # 5. 使用 shell 模块进行复杂操作(谨慎使用)
    - name: Complex file operations
      shell: |
        find /var/log/myapp -name "*.log" -mtime +7 -delete
        find /tmp -name "myapp_*" -mtime +1 -delete
      args:
        warn: false
      changed_when: false
      tags: [cleanup]

9.5 自定义模块和插件

9.5.1 自定义模块开发

#!/usr/bin/python
# library/custom_service.py
# 自定义服务管理模块

from ansible.module_utils.basic import AnsibleModule
import subprocess
import json

DOCUMENTATION = '''
---
module: custom_service
short_description: Custom service management module
description:
    - Manages services with additional features
    - Supports health checks and graceful restarts
version_added: "1.0"
author: "DevOps Team"
options:
    name:
        description:
            - Name of the service
        required: true
        type: str
    state:
        description:
            - Desired state of the service
        choices: ['started', 'stopped', 'restarted', 'reloaded']
        default: started
        type: str
    enabled:
        description:
            - Whether the service should start on boot
        type: bool
        default: true
    health_check_url:
        description:
            - URL to check service health
        type: str
    health_check_timeout:
        description:
            - Timeout for health check in seconds
        type: int
        default: 30
    graceful_timeout:
        description:
            - Timeout for graceful shutdown in seconds
        type: int
        default: 60
'''

EXAMPLES = '''
# Start a service with health check
- custom_service:
    name: nginx
    state: started
    health_check_url: http://localhost/health

# Graceful restart with custom timeout
- custom_service:
    name: myapp
    state: restarted
    graceful_timeout: 120
    health_check_url: http://localhost:8080/health
'''

RETURN = '''
changed:
    description: Whether the service state was changed
    type: bool
    returned: always
state:
    description: Current state of the service
    type: str
    returned: always
health_status:
    description: Health check result
    type: str
    returned: when health_check_url is provided
'''

def run_command(module, cmd):
    """运行系统命令"""
    try:
        result = subprocess.run(
            cmd, 
            shell=True, 
            capture_output=True, 
            text=True, 
            timeout=30
        )
        return result.returncode, result.stdout, result.stderr
    except subprocess.TimeoutExpired:
        module.fail_json(msg=f"Command timed out: {cmd}")
    except Exception as e:
        module.fail_json(msg=f"Command failed: {e}")

def check_service_status(module, service_name):
    """检查服务状态"""
    rc, stdout, stderr = run_command(module, f"systemctl is-active {service_name}")
    return stdout.strip() == 'active'

def check_service_enabled(module, service_name):
    """检查服务是否开机启动"""
    rc, stdout, stderr = run_command(module, f"systemctl is-enabled {service_name}")
    return stdout.strip() == 'enabled'

def health_check(module, url, timeout):
    """执行健康检查"""
    try:
        import urllib.request
        import urllib.error
        
        request = urllib.request.Request(url)
        response = urllib.request.urlopen(request, timeout=timeout)
        return response.getcode() == 200, f"HTTP {response.getcode()}"
    except urllib.error.URLError as e:
        return False, str(e)
    except Exception as e:
        return False, str(e)

def manage_service(module):
    """主要的服务管理逻辑"""
    name = module.params['name']
    state = module.params['state']
    enabled = module.params['enabled']
    health_check_url = module.params['health_check_url']
    health_check_timeout = module.params['health_check_timeout']
    graceful_timeout = module.params['graceful_timeout']
    
    changed = False
    result = {
        'changed': False,
        'name': name,
        'state': state
    }
    
    # 检查当前状态
    is_active = check_service_status(module, name)
    is_enabled = check_service_enabled(module, name)
    
    # 管理开机启动
    if enabled != is_enabled:
        action = 'enable' if enabled else 'disable'
        rc, stdout, stderr = run_command(module, f"systemctl {action} {name}")
        if rc != 0:
            module.fail_json(msg=f"Failed to {action} service: {stderr}")
        changed = True
    
    # 管理服务状态
    if state == 'started' and not is_active:
        rc, stdout, stderr = run_command(module, f"systemctl start {name}")
        if rc != 0:
            module.fail_json(msg=f"Failed to start service: {stderr}")
        changed = True
    
    elif state == 'stopped' and is_active:
        # 优雅停止
        rc, stdout, stderr = run_command(module, f"systemctl stop {name}")
        if rc != 0:
            module.fail_json(msg=f"Failed to stop service: {stderr}")
        changed = True
    
    elif state == 'restarted':
        if is_active:
            # 优雅重启
            rc, stdout, stderr = run_command(module, f"systemctl restart {name}")
        else:
            rc, stdout, stderr = run_command(module, f"systemctl start {name}")
        
        if rc != 0:
            module.fail_json(msg=f"Failed to restart service: {stderr}")
        changed = True
    
    elif state == 'reloaded' and is_active:
        rc, stdout, stderr = run_command(module, f"systemctl reload {name}")
        if rc != 0:
            # 如果 reload 失败,尝试 restart
            rc, stdout, stderr = run_command(module, f"systemctl restart {name}")
            if rc != 0:
                module.fail_json(msg=f"Failed to reload/restart service: {stderr}")
        changed = True
    
    # 健康检查
    if health_check_url and (state in ['started', 'restarted', 'reloaded']):
        import time
        time.sleep(2)  # 等待服务启动
        
        health_ok, health_msg = health_check(module, health_check_url, health_check_timeout)
        result['health_status'] = 'healthy' if health_ok else 'unhealthy'
        result['health_message'] = health_msg
        
        if not health_ok:
            module.fail_json(msg=f"Service health check failed: {health_msg}")
    
    # 更新最终状态
    result['state'] = 'started' if check_service_status(module, name) else 'stopped'
    result['enabled'] = check_service_enabled(module, name)
    result['changed'] = changed
    
    return result

def main():
    """模块入口点"""
    module = AnsibleModule(
        argument_spec=dict(
            name=dict(type='str', required=True),
            state=dict(type='str', default='started', 
                      choices=['started', 'stopped', 'restarted', 'reloaded']),
            enabled=dict(type='bool', default=True),
            health_check_url=dict(type='str'),
            health_check_timeout=dict(type='int', default=30),
            graceful_timeout=dict(type='int', default=60)
        ),
        supports_check_mode=True
    )
    
    if module.check_mode:
        # 检查模式:不执行实际操作
        result = {
            'changed': False,
            'name': module.params['name'],
            'state': module.params['state']
        }
        module.exit_json(**result)
    
    try:
        result = manage_service(module)
        module.exit_json(**result)
    except Exception as e:
        module.fail_json(msg=f"Unexpected error: {str(e)}")

if __name__ == '__main__':
    main()

9.5.2 自定义过滤器插件

# filter_plugins/custom_filters.py
# 自定义过滤器插件

import re
import base64
import hashlib
import json
from datetime import datetime, timedelta

class FilterModule(object):
    """自定义过滤器模块"""
    
    def filters(self):
        return {
            'to_nice_json': self.to_nice_json,
            'mask_sensitive': self.mask_sensitive,
            'generate_password': self.generate_password,
            'validate_email': self.validate_email,
            'format_bytes': self.format_bytes,
            'time_ago': self.time_ago,
            'extract_domain': self.extract_domain,
            'safe_filename': self.safe_filename,
            'merge_dicts': self.merge_dicts,
            'flatten_list': self.flatten_list
        }
    
    def to_nice_json(self, data, indent=2):
        """格式化 JSON 输出"""
        try:
            return json.dumps(data, indent=indent, sort_keys=True, ensure_ascii=False)
        except (TypeError, ValueError) as e:
            return f"Error formatting JSON: {e}"
    
    def mask_sensitive(self, value, mask_char='*', visible_chars=4):
        """遮蔽敏感信息"""
        if not isinstance(value, str) or len(value) <= visible_chars:
            return mask_char * 8
        
        visible_start = visible_chars // 2
        visible_end = visible_chars - visible_start
        masked_length = len(value) - visible_chars
        
        return (value[:visible_start] + 
                mask_char * masked_length + 
                value[-visible_end:] if visible_end > 0 else '')
    
    def generate_password(self, length=12, include_symbols=True):
        """生成随机密码"""
        import random
        import string
        
        chars = string.ascii_letters + string.digits
        if include_symbols:
            chars += '!@#$%^&*()_+-=[]{}|;:,.<>?'
        
        return ''.join(random.choice(chars) for _ in range(length))
    
    def validate_email(self, email):
        """验证邮箱地址格式"""
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))
    
    def format_bytes(self, bytes_value, precision=2):
        """格式化字节大小"""
        try:
            bytes_value = float(bytes_value)
        except (TypeError, ValueError):
            return "Invalid input"
        
        units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
        unit_index = 0
        
        while bytes_value >= 1024 and unit_index < len(units) - 1:
            bytes_value /= 1024
            unit_index += 1
        
        return f"{bytes_value:.{precision}f} {units[unit_index]}"
    
    def time_ago(self, timestamp, format='%Y-%m-%d %H:%M:%S'):
        """计算时间差"""
        try:
            if isinstance(timestamp, str):
                dt = datetime.strptime(timestamp, format)
            elif isinstance(timestamp, (int, float)):
                dt = datetime.fromtimestamp(timestamp)
            else:
                return "Invalid timestamp"
            
            now = datetime.now()
            diff = now - dt
            
            if diff.days > 0:
                return f"{diff.days} days ago"
            elif diff.seconds > 3600:
                hours = diff.seconds // 3600
                return f"{hours} hours ago"
            elif diff.seconds > 60:
                minutes = diff.seconds // 60
                return f"{minutes} minutes ago"
            else:
                return "Just now"
        except Exception as e:
            return f"Error: {e}"
    
    def extract_domain(self, url_or_email):
        """提取域名"""
        # 处理邮箱
        if '@' in url_or_email:
            return url_or_email.split('@')[-1]
        
        # 处理 URL
        pattern = r'https?://([^/]+)'
        match = re.search(pattern, url_or_email)
        if match:
            return match.group(1)
        
        # 直接返回(可能已经是域名)
        return url_or_email
    
    def safe_filename(self, filename):
        """生成安全的文件名"""
        # 移除或替换不安全的字符
        safe_chars = re.sub(r'[^a-zA-Z0-9._-]', '_', filename)
        # 移除多个连续的下划线
        safe_chars = re.sub(r'_+', '_', safe_chars)
        # 移除开头和结尾的下划线
        return safe_chars.strip('_')
    
    def merge_dicts(self, dict1, dict2, deep=True):
        """合并字典"""
        if not isinstance(dict1, dict) or not isinstance(dict2, dict):
            return dict2
        
        result = dict1.copy()
        
        for key, value in dict2.items():
            if key in result and deep and isinstance(result[key], dict) and isinstance(value, dict):
                result[key] = self.merge_dicts(result[key], value, deep)
            else:
                result[key] = value
        
        return result
    
    def flatten_list(self, nested_list):
        """展平嵌套列表"""
        def _flatten(lst):
            for item in lst:
                if isinstance(item, list):
                    yield from _flatten(item)
                else:
                    yield item
        
        return list(_flatten(nested_list))

9.5.3 使用自定义模块和过滤器

# playbooks/custom_modules_demo.yml
# 使用自定义模块和过滤器的示例
---
- name: Custom modules and filters demo
  hosts: webservers
  vars:
    sensitive_data:
      api_key: "abc123def456ghi789jkl012"
      database_password: "super_secret_password_123"
    
    user_data:
      name: "John Doe"
      email: "john.doe@example.com"
      created_at: "2023-01-15 10:30:00"
    
    file_sizes:
      - 1024
      - 2048576
      - 1073741824
    
    nested_config:
      database:
        host: localhost
        port: 5432
      cache:
        enabled: true
        ttl: 3600
  
  tasks:
    # 使用自定义服务模块
    - name: Manage nginx with health check
      custom_service:
        name: nginx
        state: started
        enabled: true
        health_check_url: "http://{{ inventory_hostname }}/health"
        health_check_timeout: 30
      tags: [service]
    
    # 使用自定义过滤器
    - name: Display masked sensitive data
      debug:
        msg: |
          API Key: {{ sensitive_data.api_key | mask_sensitive }}
          DB Password: {{ sensitive_data.database_password | mask_sensitive(visible_chars=6) }}
      tags: [debug, security]
    
    - name: Validate and extract email domain
      debug:
        msg: |
          Email: {{ user_data.email }}
          Valid: {{ user_data.email | validate_email }}
          Domain: {{ user_data.email | extract_domain }}
      tags: [debug, email]
    
    - name: Format file sizes
      debug:
        msg: "File {{ item }} bytes = {{ item | format_bytes }}"
      loop: "{{ file_sizes }}"
      tags: [debug, files]
    
    - name: Show time ago
      debug:
        msg: "User created: {{ user_data.created_at | time_ago }}"
      tags: [debug, time]
    
    - name: Generate configuration with nice JSON
      copy:
        content: "{{ nested_config | to_nice_json }}"
        dest: "/tmp/config.json"
        mode: '0644'
      tags: [config]
    
    - name: Create safe filename
      debug:
        msg: "Safe filename: {{ 'My File (2023-01-15) [FINAL].txt' | safe_filename }}"
      tags: [debug, filename]
    
    - name: Generate random password
      debug:
        msg: "Generated password: {{ '' | generate_password(16, true) }}"
      tags: [debug, password]
      no_log: true  # 不记录密码到日志

9.6 本章总结

本章介绍了 Ansible 的高级特性和技巧:

  • Vault 加密管理:保护敏感数据的安全性
  • 动态包含和导入:提高 Playbook 的灵活性和可维护性
  • 错误处理和调试:增强 Playbook 的健壮性和可调试性
  • 性能优化:提高 Ansible 执行效率
  • 自定义模块和插件:扩展 Ansible 功能

这些高级特性使 Ansible 能够应对复杂的自动化场景,提供企业级的解决方案。

9.7 练习题

基础练习

  1. Vault 使用

    • 创建加密的变量文件
    • 在 Playbook 中使用加密变量
    • 实现多环境的密钥管理
  2. 动态包含

    • 实现基于操作系统的动态任务包含
    • 创建条件性的角色包含
    • 设计灵活的 Playbook 结构

进阶练习

  1. 错误处理

    • 实现复杂的错误处理逻辑
    • 设计回滚机制
    • 添加详细的日志和审计功能
  2. 性能优化

    • 优化大规模部署的性能
    • 实现智能的批处理策略
    • 分析和解决性能瓶颈

实战练习

  1. 自定义模块开发

    • 开发特定业务需求的自定义模块
    • 实现完整的错误处理和文档
    • 编写模块测试用例
  2. 企业级解决方案

    • 设计完整的企业级自动化方案
    • 集成监控、日志和安全功能
    • 实现 CI/CD 流水线集成

下一章第10章:实战项目案例

返回目录Ansible 自动化运维教程