9.1 Ansible Vault 加密管理
9.1.1 Vault 基础概念
Ansible Vault 是 Ansible 内置的加密功能,用于保护敏感数据如密码、密钥、证书等。Vault 使用 AES256 加密算法,确保敏感信息的安全性。
Vault 的用途: - 加密密码和 API 密钥 - 保护 SSL 证书和私钥 - 加密配置文件中的敏感部分 - 安全地存储数据库连接信息 - 保护云服务凭证
9.1.2 Vault 基本操作
# 创建加密文件
ansible-vault create secrets.yml
# 编辑加密文件
ansible-vault edit secrets.yml
# 查看加密文件内容
ansible-vault view secrets.yml
# 加密现有文件
ansible-vault encrypt plaintext.yml
# 解密文件
ansible-vault decrypt secrets.yml
# 重新设置密码
ansible-vault rekey secrets.yml
# 加密字符串
ansible-vault encrypt_string 'secret_password' --name 'db_password'
# 使用密码文件
ansible-vault create --vault-password-file ~/.vault_pass secrets.yml
9.1.3 Vault 文件示例
# group_vars/production/vault.yml
# 生产环境加密变量
$ANSIBLE_VAULT;1.1;AES256
66386439653162336464643965393835663365356332643732393064663533343061613431313265
6664373764613963653965383138316364616533373734360a653638643435666633633964366235
63386435626139623965353965653266306164626565653637343334376165323030643936653030
3438626666666137650a353638643435666633633964366235633864356261396239653539653532
66306164626565653637343334376165323030643936653030343862666666613765
# 解密后的内容:
---
vault_mysql_root_password: super_secret_password
vault_api_key: abc123def456ghi789
vault_ssl_private_key: |
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC7...
-----END PRIVATE KEY-----
vault_database_url: postgresql://user:pass@db.example.com:5432/myapp
# group_vars/production/vars.yml
# 引用加密变量的普通变量文件
---
# 数据库配置
mysql_root_password: "{{ vault_mysql_root_password }}"
api_key: "{{ vault_api_key }}"
ssl_private_key: "{{ vault_ssl_private_key }}"
database_url: "{{ vault_database_url }}"
# 其他非敏感配置
mysql_port: 3306
mysql_host: localhost
api_endpoint: https://api.example.com
9.1.4 多密码文件管理
# 创建不同环境的密码文件
echo "production_vault_password" > ~/.vault_pass_prod
echo "staging_vault_password" > ~/.vault_pass_staging
echo "development_vault_password" > ~/.vault_pass_dev
# 设置文件权限
chmod 600 ~/.vault_pass_*
# 使用不同的密码文件
ansible-playbook -i inventory/production site.yml --vault-password-file ~/.vault_pass_prod
ansible-playbook -i inventory/staging site.yml --vault-password-file ~/.vault_pass_staging
#!/usr/bin/env python3
# scripts/vault_password.py
# 动态密码脚本
import os
import sys
import subprocess
def get_vault_password():
"""从密码管理器获取 Vault 密码"""
environment = os.environ.get('ANSIBLE_ENVIRONMENT', 'development')
try:
# 从 pass 密码管理器获取密码
result = subprocess.run(
['pass', f'ansible/vault/{environment}'],
capture_output=True,
text=True,
check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError:
# 备用方案:从环境变量获取
password = os.environ.get(f'ANSIBLE_VAULT_PASSWORD_{environment.upper()}')
if password:
return password
# 最后备用方案:提示用户输入
import getpass
return getpass.getpass(f'Vault password for {environment}: ')
if __name__ == '__main__':
print(get_vault_password())
# 使用动态密码脚本
ansible-playbook site.yml --vault-password-file scripts/vault_password.py
# 设置环境变量
export ANSIBLE_ENVIRONMENT=production
ansible-playbook site.yml --vault-password-file scripts/vault_password.py
9.1.5 内联加密变量
# playbooks/deploy.yml
# 在 Playbook 中使用内联加密变量
---
- name: Deploy application with encrypted secrets
hosts: webservers
vars:
# 内联加密的数据库密码
db_password: !vault |
$ANSIBLE_VAULT;1.1;AES256
66386439653162336464643965393835663365356332643732393064663533343061613431313265
6664373764613963653965383138316364616533373734360a653638643435666633633964366235
63386435626139623965353965653266306164626565653637343334376165323030643936653030
3438626666666137650a353638643435666633633964366235633864356261396239653539653532
66306164626565653637343334376165323030643936653030343862666666613765
# 内联加密的 API 密钥
api_secret: !vault |
$ANSIBLE_VAULT;1.1;AES256
33663365356332643732393064663533343061613431313265366437376461396365396538313831
6364616533373734360a653638643435666633633964366235633864356261396239653539653532
66306164626565653637343334376165323030643936653030343862666666613765
tasks:
- name: Configure database connection
template:
src: database.conf.j2
dest: /etc/myapp/database.conf
mode: '0600'
vars:
database_password: "{{ db_password }}"
- name: Set API configuration
lineinfile:
path: /etc/myapp/api.conf
regexp: '^api_secret='
line: "api_secret={{ api_secret }}"
mode: '0600'
9.2 动态包含和导入
9.2.1 include vs import 的区别
# 静态导入(编译时处理)
- import_tasks: setup.yml
- import_playbook: common.yml
- import_role: nginx
# 动态包含(运行时处理)
- include_tasks: "{{ ansible_os_family }}.yml"
- include_playbook: "{{ environment }}.yml"
- include_role:
name: "{{ web_server_type }}"
9.2.2 动态任务包含
# playbooks/main.yml
# 主 Playbook
---
- name: Dynamic task inclusion example
hosts: all
vars:
os_tasks:
RedHat: redhat_tasks.yml
Debian: debian_tasks.yml
Windows: windows_tasks.yml
tasks:
- name: Include OS-specific tasks
include_tasks: "{{ os_tasks[ansible_os_family] }}"
when: ansible_os_family in os_tasks
- name: Include tasks based on server role
include_tasks: "roles/{{ server_role }}/tasks/main.yml"
when: server_role is defined
- name: Include environment-specific tasks
include_tasks: "tasks/{{ environment }}/{{ item }}.yml"
loop:
- security
- monitoring
- backup
when: environment in ['production', 'staging']
# tasks/redhat_tasks.yml
# RedHat 系列系统特定任务
---
- name: Install EPEL repository
yum:
name: epel-release
state: present
- name: Update all packages
yum:
name: '*'
state: latest
when: update_packages | default(false)
- name: Install RedHat specific packages
yum:
name: "{{ redhat_packages }}"
state: present
vars:
redhat_packages:
- firewalld
- policycoreutils-python-utils
- selinux-policy-targeted
- name: Configure SELinux
selinux:
policy: targeted
state: "{{ selinux_state | default('enforcing') }}"
notify: reboot system
- name: Start and enable firewalld
systemd:
name: firewalld
state: started
enabled: yes
# tasks/debian_tasks.yml
# Debian 系列系统特定任务
---
- name: Update package cache
apt:
update_cache: yes
cache_valid_time: 3600
- name: Upgrade all packages
apt:
upgrade: dist
when: update_packages | default(false)
- name: Install Debian specific packages
apt:
name: "{{ debian_packages }}"
state: present
vars:
debian_packages:
- ufw
- apparmor
- apparmor-utils
- name: Configure UFW firewall
ufw:
state: enabled
policy: deny
direction: incoming
- name: Allow SSH through firewall
ufw:
rule: allow
port: '22'
proto: tcp
9.2.3 条件性角色包含
# playbooks/infrastructure.yml
# 基础设施部署 Playbook
---
- name: Deploy infrastructure components
hosts: all
vars:
# 服务组件映射
service_roles:
web: nginx
api: nodejs
database: mysql
cache: redis
queue: rabbitmq
monitoring: prometheus
# 环境特定的角色
environment_roles:
production:
- security_hardening
- log_shipping
- backup
staging:
- development_tools
development:
- debug_tools
- test_data
tasks:
- name: Include base system role
include_role:
name: common
tags: [common, base]
- name: Include service-specific roles
include_role:
name: "{{ service_roles[item] }}"
loop: "{{ server_services | default([]) }}"
when: item in service_roles
tags: [services]
- name: Include environment-specific roles
include_role:
name: "{{ item }}"
loop: "{{ environment_roles[environment] | default([]) }}"
when: environment in environment_roles
tags: [environment]
- name: Include custom roles
include_role:
name: "{{ item.name }}"
vars: "{{ item.vars | default({}) }}"
loop: "{{ custom_roles | default([]) }}"
tags: [custom]
9.2.4 动态 Playbook 包含
# playbooks/site.yml
# 主站点 Playbook
---
# 基础系统配置
- import_playbook: common.yml
# 根据环境包含不同的 Playbook
- include_playbook: "{{ environment }}.yml"
when: environment is defined
# 根据主机组包含特定 Playbook
- include_playbook: webservers.yml
when: "'webservers' in group_names"
- include_playbook: databases.yml
when: "'databases' in group_names"
- include_playbook: loadbalancers.yml
when: "'loadbalancers' in group_names"
# playbooks/production.yml
# 生产环境特定 Playbook
---
- name: Production environment setup
hosts: all
serial: "{{ rolling_update_batch_size | default(1) }}"
max_fail_percentage: 10
pre_tasks:
- name: Check system requirements
include_tasks: tasks/production/pre_checks.yml
tags: [pre_checks]
roles:
- role: security_hardening
tags: [security]
- role: monitoring
vars:
monitoring_environment: production
alert_email: ops@example.com
tags: [monitoring]
- role: backup
vars:
backup_schedule: "0 2 * * *"
backup_retention: 30
tags: [backup]
post_tasks:
- name: Validate deployment
include_tasks: tasks/production/post_checks.yml
tags: [post_checks]
- name: Send deployment notification
include_tasks: tasks/notifications.yml
vars:
notification_type: deployment_complete
tags: [notifications]
9.3 错误处理和调试
9.3.1 错误处理策略
# playbooks/error_handling.yml
# 错误处理示例
---
- name: Error handling examples
hosts: webservers
vars:
max_retries: 3
retry_delay: 5
tasks:
# 1. 忽略错误继续执行
- name: Try to stop service (ignore if not running)
service:
name: nginx
state: stopped
ignore_errors: yes
tags: [service]
# 2. 自定义失败条件
- name: Check disk space
shell: df -h / | tail -1 | awk '{print $5}' | sed 's/%//'
register: disk_usage
failed_when: disk_usage.stdout | int > 90
changed_when: false
tags: [disk_check]
# 3. 重试机制
- name: Download file with retries
get_url:
url: "{{ download_url }}"
dest: "/tmp/{{ download_file }}"
timeout: 30
register: download_result
retries: "{{ max_retries }}"
delay: "{{ retry_delay }}"
until: download_result is succeeded
tags: [download]
# 4. 块级错误处理
- name: Database operations with error handling
block:
- name: Create database backup
mysql_db:
name: "{{ db_name }}"
state: dump
target: "/backup/{{ db_name }}_{{ ansible_date_time.epoch }}.sql"
- name: Update database schema
mysql_db:
name: "{{ db_name }}"
state: import
target: "/tmp/schema_update.sql"
- name: Verify database integrity
mysql_query:
login_db: "{{ db_name }}"
query: "CHECK TABLE users, products, orders"
register: integrity_check
failed_when: "'error' in integrity_check.query_result[0] | lower"
rescue:
- name: Restore database from backup
mysql_db:
name: "{{ db_name }}"
state: import
target: "/backup/{{ db_name }}_{{ ansible_date_time.epoch }}.sql"
- name: Send alert notification
mail:
to: dba@example.com
subject: "Database update failed on {{ inventory_hostname }}"
body: "Database schema update failed and was rolled back."
- name: Fail the playbook
fail:
msg: "Database update failed and was rolled back"
always:
- name: Clean up temporary files
file:
path: "/tmp/schema_update.sql"
state: absent
tags: [database]
# 5. 条件性失败
- name: Check service health
uri:
url: "http://{{ inventory_hostname }}:{{ app_port }}/health"
method: GET
status_code: 200
register: health_check
failed_when: false # 不让这个任务失败
tags: [health_check]
- name: Fail if health check failed
fail:
msg: "Service health check failed: {{ health_check.msg | default('Unknown error') }}"
when: health_check.status != 200
tags: [health_check]
9.3.2 调试技巧
# playbooks/debugging.yml
# 调试技巧示例
---
- name: Debugging techniques
hosts: localhost
vars:
debug_mode: "{{ ansible_verbosity >= 2 }}"
app_config:
database:
host: db.example.com
port: 5432
name: myapp
cache:
enabled: true
ttl: 3600
tasks:
# 1. 调试变量内容
- name: Debug variable contents
debug:
var: app_config
when: debug_mode
tags: [debug]
- name: Debug with custom message
debug:
msg: "Processing {{ inventory_hostname }} in {{ environment }} environment"
when: debug_mode
tags: [debug]
# 2. 条件性调试
- name: Debug only when condition is met
debug:
msg: "Warning: Running in development mode"
when:
- environment == 'development'
- debug_mode
tags: [debug]
# 3. 调试注册变量
- name: Get system information
setup:
gather_subset:
- hardware
- network
register: system_facts
tags: [facts]
- name: Debug system facts
debug:
var: system_facts.ansible_facts.ansible_memtotal_mb
when: debug_mode
tags: [debug, facts]
# 4. 使用 assert 进行验证
- name: Validate configuration
assert:
that:
- app_config.database.host is defined
- app_config.database.port | int > 0
- app_config.database.name | length > 0
fail_msg: "Invalid database configuration"
success_msg: "Database configuration is valid"
tags: [validation]
# 5. 暂停执行进行调试
- name: Pause for manual inspection
pause:
prompt: "Press enter to continue or Ctrl+C to abort"
when:
- debug_mode
- manual_pause | default(false)
tags: [debug, pause]
# 6. 调试模板渲染
- name: Debug template rendering
debug:
msg: "{{ lookup('template', 'config.j2') }}"
when: debug_mode
tags: [debug, template]
# 7. 调试循环变量
- name: Debug loop variables
debug:
msg: "Processing item {{ item }} (index: {{ ansible_loop.index }})"
loop:
- web1
- web2
- web3
when: debug_mode
tags: [debug, loop]
9.3.3 日志和审计
# ansible.cfg
# 日志配置
[defaults]
# 启用日志记录
log_path = /var/log/ansible.log
# 记录详细信息
verbosity = 1
# 显示任务执行时间
callback_whitelist = timer, profile_tasks
# 记录主机密钥检查
host_key_checking = True
[callback_profile_tasks]
# 性能分析配置
task_output_limit = 20
sort_order = descending
#!/usr/bin/env python3
# callback_plugins/audit_log.py
# 自定义审计日志插件
from ansible.plugins.callback import CallbackBase
import json
import datetime
import os
class CallbackModule(CallbackBase):
"""
审计日志回调插件
记录所有任务执行的详细信息
"""
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'notification'
CALLBACK_NAME = 'audit_log'
def __init__(self):
super(CallbackModule, self).__init__()
self.audit_log_path = os.environ.get('ANSIBLE_AUDIT_LOG', '/var/log/ansible-audit.log')
self.start_time = datetime.datetime.now()
def _write_audit_log(self, event_type, data):
"""写入审计日志"""
audit_entry = {
'timestamp': datetime.datetime.now().isoformat(),
'event_type': event_type,
'playbook': getattr(self, '_playbook_name', 'unknown'),
'user': os.environ.get('USER', 'unknown'),
'data': data
}
try:
with open(self.audit_log_path, 'a') as f:
f.write(json.dumps(audit_entry) + '\n')
except Exception as e:
self._display.warning(f"Failed to write audit log: {e}")
def v2_playbook_on_start(self, playbook):
"""Playbook 开始执行"""
self._playbook_name = os.path.basename(playbook._file_name)
self._write_audit_log('playbook_start', {
'playbook': self._playbook_name,
'file_path': playbook._file_name
})
def v2_playbook_on_task_start(self, task, is_conditional):
"""任务开始执行"""
self._write_audit_log('task_start', {
'task_name': task.get_name(),
'task_action': task.action,
'is_conditional': is_conditional
})
def v2_runner_on_ok(self, result):
"""任务执行成功"""
self._write_audit_log('task_ok', {
'host': result._host.get_name(),
'task_name': result._task.get_name(),
'changed': result._result.get('changed', False),
'result': result._result
})
def v2_runner_on_failed(self, result, ignore_errors=False):
"""任务执行失败"""
self._write_audit_log('task_failed', {
'host': result._host.get_name(),
'task_name': result._task.get_name(),
'error': result._result.get('msg', 'Unknown error'),
'ignore_errors': ignore_errors,
'result': result._result
})
def v2_playbook_on_stats(self, stats):
"""Playbook 执行完成"""
end_time = datetime.datetime.now()
duration = (end_time - self.start_time).total_seconds()
summary = {}
for host in stats.processed:
summary[host] = {
'ok': stats.ok.get(host, 0),
'changed': stats.changed.get(host, 0),
'unreachable': stats.dark.get(host, 0),
'failed': stats.failures.get(host, 0),
'skipped': stats.skipped.get(host, 0)
}
self._write_audit_log('playbook_end', {
'duration_seconds': duration,
'summary': summary
})
9.4 性能优化
9.4.1 并行执行优化
# playbooks/performance.yml
# 性能优化示例
---
- name: Performance optimization examples
hosts: webservers
strategy: free # 使用 free 策略提高并行度
serial: 5 # 每批处理 5 台主机
max_fail_percentage: 20 # 允许 20% 的主机失败
vars:
# 优化连接设置
ansible_ssh_pipelining: true
ansible_ssh_multiplexing: true
tasks:
# 1. 禁用不必要的事实收集
- name: Skip fact gathering for simple tasks
setup:
gather_subset:
- '!all'
- '!any'
- network
when: minimal_facts | default(false)
tags: [facts]
# 2. 使用异步任务处理长时间运行的操作
- name: Long running task (async)
command: /usr/bin/long_running_script.sh
async: 300 # 最大运行时间 5 分钟
poll: 0 # 不等待完成
register: long_task
tags: [async]
# 3. 批量操作而不是循环
- name: Install multiple packages at once
package:
name: "{{ packages }}"
state: present
vars:
packages:
- nginx
- mysql-server
- redis-server
- nodejs
tags: [packages]
# 4. 使用 changed_when 避免不必要的变更
- name: Check service status
command: systemctl is-active nginx
register: nginx_status
changed_when: false
failed_when: nginx_status.rc not in [0, 3]
tags: [status]
# 5. 条件性任务执行
- name: Configure firewall (only if needed)
firewalld:
service: http
permanent: yes
state: enabled
when:
- firewall_enabled | default(true)
- ansible_os_family == 'RedHat'
tags: [firewall]
# 6. 检查异步任务状态
- name: Wait for long running task
async_status:
jid: "{{ long_task.ansible_job_id }}"
register: job_result
until: job_result.finished
retries: 30
delay: 10
when: long_task.ansible_job_id is defined
tags: [async]
9.4.2 连接优化
# ansible.cfg
# 连接优化配置
[defaults]
# SSH 连接优化
host_key_checking = False
ssh_args = -o ControlMaster=auto -o ControlPersist=60s -o UserKnownHostsFile=/dev/null -o IdentitiesOnly=yes
# 启用 SSH 管道
pipelining = True
# 增加并发连接数
forks = 20
# 连接超时设置
timeout = 30
# 禁用 cowsay
nocows = 1
# 事实缓存
gathering = smart
fact_caching = jsonfile
fact_caching_connection = /tmp/ansible_facts_cache
fact_caching_timeout = 86400
[ssh_connection]
# SSH 连接复用
control_path_dir = ~/.ansible/cp
control_path = %(directory)s/%%h-%%p-%%r
# SSH 传输优化
transfer_method = smart
retries = 3
9.4.3 内存和 CPU 优化
# playbooks/resource_optimization.yml
# 资源优化示例
---
- name: Resource optimization
hosts: all
vars:
# 限制同时处理的主机数量
batch_size: "{{ (ansible_play_hosts | length / 4) | round(0, 'ceil') | int }}"
serial: "{{ batch_size }}"
tasks:
# 1. 使用 lineinfile 而不是 template(对于简单修改)
- name: Update configuration (efficient)
lineinfile:
path: /etc/myapp/config.conf
regexp: '^max_connections='
line: "max_connections={{ max_connections | default(100) }}"
tags: [config]
# 2. 避免在循环中使用 template
- name: Generate config files efficiently
template:
src: "{{ item.src }}"
dest: "{{ item.dest }}"
loop:
- { src: 'nginx.conf.j2', dest: '/etc/nginx/nginx.conf' }
- { src: 'php.ini.j2', dest: '/etc/php/7.4/fpm/php.ini' }
tags: [templates]
# 3. 使用 copy 模块的 content 参数(小文件)
- name: Create small config file
copy:
content: |
# Generated by Ansible
server_name={{ inventory_hostname }}
environment={{ environment }}
debug={{ debug_mode | default(false) | lower }}
dest: /etc/myapp/server.conf
mode: '0644'
tags: [config]
# 4. 批量文件操作
- name: Create multiple directories
file:
path: "{{ item }}"
state: directory
mode: '0755'
loop:
- /var/log/myapp
- /var/lib/myapp
- /etc/myapp/conf.d
- /opt/myapp/bin
tags: [directories]
# 5. 使用 shell 模块进行复杂操作(谨慎使用)
- name: Complex file operations
shell: |
find /var/log/myapp -name "*.log" -mtime +7 -delete
find /tmp -name "myapp_*" -mtime +1 -delete
args:
warn: false
changed_when: false
tags: [cleanup]
9.5 自定义模块和插件
9.5.1 自定义模块开发
#!/usr/bin/python
# library/custom_service.py
# 自定义服务管理模块
from ansible.module_utils.basic import AnsibleModule
import subprocess
import json
DOCUMENTATION = '''
---
module: custom_service
short_description: Custom service management module
description:
- Manages services with additional features
- Supports health checks and graceful restarts
version_added: "1.0"
author: "DevOps Team"
options:
name:
description:
- Name of the service
required: true
type: str
state:
description:
- Desired state of the service
choices: ['started', 'stopped', 'restarted', 'reloaded']
default: started
type: str
enabled:
description:
- Whether the service should start on boot
type: bool
default: true
health_check_url:
description:
- URL to check service health
type: str
health_check_timeout:
description:
- Timeout for health check in seconds
type: int
default: 30
graceful_timeout:
description:
- Timeout for graceful shutdown in seconds
type: int
default: 60
'''
EXAMPLES = '''
# Start a service with health check
- custom_service:
name: nginx
state: started
health_check_url: http://localhost/health
# Graceful restart with custom timeout
- custom_service:
name: myapp
state: restarted
graceful_timeout: 120
health_check_url: http://localhost:8080/health
'''
RETURN = '''
changed:
description: Whether the service state was changed
type: bool
returned: always
state:
description: Current state of the service
type: str
returned: always
health_status:
description: Health check result
type: str
returned: when health_check_url is provided
'''
def run_command(module, cmd):
"""运行系统命令"""
try:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=30
)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
module.fail_json(msg=f"Command timed out: {cmd}")
except Exception as e:
module.fail_json(msg=f"Command failed: {e}")
def check_service_status(module, service_name):
"""检查服务状态"""
rc, stdout, stderr = run_command(module, f"systemctl is-active {service_name}")
return stdout.strip() == 'active'
def check_service_enabled(module, service_name):
"""检查服务是否开机启动"""
rc, stdout, stderr = run_command(module, f"systemctl is-enabled {service_name}")
return stdout.strip() == 'enabled'
def health_check(module, url, timeout):
"""执行健康检查"""
try:
import urllib.request
import urllib.error
request = urllib.request.Request(url)
response = urllib.request.urlopen(request, timeout=timeout)
return response.getcode() == 200, f"HTTP {response.getcode()}"
except urllib.error.URLError as e:
return False, str(e)
except Exception as e:
return False, str(e)
def manage_service(module):
"""主要的服务管理逻辑"""
name = module.params['name']
state = module.params['state']
enabled = module.params['enabled']
health_check_url = module.params['health_check_url']
health_check_timeout = module.params['health_check_timeout']
graceful_timeout = module.params['graceful_timeout']
changed = False
result = {
'changed': False,
'name': name,
'state': state
}
# 检查当前状态
is_active = check_service_status(module, name)
is_enabled = check_service_enabled(module, name)
# 管理开机启动
if enabled != is_enabled:
action = 'enable' if enabled else 'disable'
rc, stdout, stderr = run_command(module, f"systemctl {action} {name}")
if rc != 0:
module.fail_json(msg=f"Failed to {action} service: {stderr}")
changed = True
# 管理服务状态
if state == 'started' and not is_active:
rc, stdout, stderr = run_command(module, f"systemctl start {name}")
if rc != 0:
module.fail_json(msg=f"Failed to start service: {stderr}")
changed = True
elif state == 'stopped' and is_active:
# 优雅停止
rc, stdout, stderr = run_command(module, f"systemctl stop {name}")
if rc != 0:
module.fail_json(msg=f"Failed to stop service: {stderr}")
changed = True
elif state == 'restarted':
if is_active:
# 优雅重启
rc, stdout, stderr = run_command(module, f"systemctl restart {name}")
else:
rc, stdout, stderr = run_command(module, f"systemctl start {name}")
if rc != 0:
module.fail_json(msg=f"Failed to restart service: {stderr}")
changed = True
elif state == 'reloaded' and is_active:
rc, stdout, stderr = run_command(module, f"systemctl reload {name}")
if rc != 0:
# 如果 reload 失败,尝试 restart
rc, stdout, stderr = run_command(module, f"systemctl restart {name}")
if rc != 0:
module.fail_json(msg=f"Failed to reload/restart service: {stderr}")
changed = True
# 健康检查
if health_check_url and (state in ['started', 'restarted', 'reloaded']):
import time
time.sleep(2) # 等待服务启动
health_ok, health_msg = health_check(module, health_check_url, health_check_timeout)
result['health_status'] = 'healthy' if health_ok else 'unhealthy'
result['health_message'] = health_msg
if not health_ok:
module.fail_json(msg=f"Service health check failed: {health_msg}")
# 更新最终状态
result['state'] = 'started' if check_service_status(module, name) else 'stopped'
result['enabled'] = check_service_enabled(module, name)
result['changed'] = changed
return result
def main():
"""模块入口点"""
module = AnsibleModule(
argument_spec=dict(
name=dict(type='str', required=True),
state=dict(type='str', default='started',
choices=['started', 'stopped', 'restarted', 'reloaded']),
enabled=dict(type='bool', default=True),
health_check_url=dict(type='str'),
health_check_timeout=dict(type='int', default=30),
graceful_timeout=dict(type='int', default=60)
),
supports_check_mode=True
)
if module.check_mode:
# 检查模式:不执行实际操作
result = {
'changed': False,
'name': module.params['name'],
'state': module.params['state']
}
module.exit_json(**result)
try:
result = manage_service(module)
module.exit_json(**result)
except Exception as e:
module.fail_json(msg=f"Unexpected error: {str(e)}")
if __name__ == '__main__':
main()
9.5.2 自定义过滤器插件
# filter_plugins/custom_filters.py
# 自定义过滤器插件
import re
import base64
import hashlib
import json
from datetime import datetime, timedelta
class FilterModule(object):
"""自定义过滤器模块"""
def filters(self):
return {
'to_nice_json': self.to_nice_json,
'mask_sensitive': self.mask_sensitive,
'generate_password': self.generate_password,
'validate_email': self.validate_email,
'format_bytes': self.format_bytes,
'time_ago': self.time_ago,
'extract_domain': self.extract_domain,
'safe_filename': self.safe_filename,
'merge_dicts': self.merge_dicts,
'flatten_list': self.flatten_list
}
def to_nice_json(self, data, indent=2):
"""格式化 JSON 输出"""
try:
return json.dumps(data, indent=indent, sort_keys=True, ensure_ascii=False)
except (TypeError, ValueError) as e:
return f"Error formatting JSON: {e}"
def mask_sensitive(self, value, mask_char='*', visible_chars=4):
"""遮蔽敏感信息"""
if not isinstance(value, str) or len(value) <= visible_chars:
return mask_char * 8
visible_start = visible_chars // 2
visible_end = visible_chars - visible_start
masked_length = len(value) - visible_chars
return (value[:visible_start] +
mask_char * masked_length +
value[-visible_end:] if visible_end > 0 else '')
def generate_password(self, length=12, include_symbols=True):
"""生成随机密码"""
import random
import string
chars = string.ascii_letters + string.digits
if include_symbols:
chars += '!@#$%^&*()_+-=[]{}|;:,.<>?'
return ''.join(random.choice(chars) for _ in range(length))
def validate_email(self, email):
"""验证邮箱地址格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def format_bytes(self, bytes_value, precision=2):
"""格式化字节大小"""
try:
bytes_value = float(bytes_value)
except (TypeError, ValueError):
return "Invalid input"
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
unit_index = 0
while bytes_value >= 1024 and unit_index < len(units) - 1:
bytes_value /= 1024
unit_index += 1
return f"{bytes_value:.{precision}f} {units[unit_index]}"
def time_ago(self, timestamp, format='%Y-%m-%d %H:%M:%S'):
"""计算时间差"""
try:
if isinstance(timestamp, str):
dt = datetime.strptime(timestamp, format)
elif isinstance(timestamp, (int, float)):
dt = datetime.fromtimestamp(timestamp)
else:
return "Invalid timestamp"
now = datetime.now()
diff = now - dt
if diff.days > 0:
return f"{diff.days} days ago"
elif diff.seconds > 3600:
hours = diff.seconds // 3600
return f"{hours} hours ago"
elif diff.seconds > 60:
minutes = diff.seconds // 60
return f"{minutes} minutes ago"
else:
return "Just now"
except Exception as e:
return f"Error: {e}"
def extract_domain(self, url_or_email):
"""提取域名"""
# 处理邮箱
if '@' in url_or_email:
return url_or_email.split('@')[-1]
# 处理 URL
pattern = r'https?://([^/]+)'
match = re.search(pattern, url_or_email)
if match:
return match.group(1)
# 直接返回(可能已经是域名)
return url_or_email
def safe_filename(self, filename):
"""生成安全的文件名"""
# 移除或替换不安全的字符
safe_chars = re.sub(r'[^a-zA-Z0-9._-]', '_', filename)
# 移除多个连续的下划线
safe_chars = re.sub(r'_+', '_', safe_chars)
# 移除开头和结尾的下划线
return safe_chars.strip('_')
def merge_dicts(self, dict1, dict2, deep=True):
"""合并字典"""
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
return dict2
result = dict1.copy()
for key, value in dict2.items():
if key in result and deep and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self.merge_dicts(result[key], value, deep)
else:
result[key] = value
return result
def flatten_list(self, nested_list):
"""展平嵌套列表"""
def _flatten(lst):
for item in lst:
if isinstance(item, list):
yield from _flatten(item)
else:
yield item
return list(_flatten(nested_list))
9.5.3 使用自定义模块和过滤器
# playbooks/custom_modules_demo.yml
# 使用自定义模块和过滤器的示例
---
- name: Custom modules and filters demo
hosts: webservers
vars:
sensitive_data:
api_key: "abc123def456ghi789jkl012"
database_password: "super_secret_password_123"
user_data:
name: "John Doe"
email: "john.doe@example.com"
created_at: "2023-01-15 10:30:00"
file_sizes:
- 1024
- 2048576
- 1073741824
nested_config:
database:
host: localhost
port: 5432
cache:
enabled: true
ttl: 3600
tasks:
# 使用自定义服务模块
- name: Manage nginx with health check
custom_service:
name: nginx
state: started
enabled: true
health_check_url: "http://{{ inventory_hostname }}/health"
health_check_timeout: 30
tags: [service]
# 使用自定义过滤器
- name: Display masked sensitive data
debug:
msg: |
API Key: {{ sensitive_data.api_key | mask_sensitive }}
DB Password: {{ sensitive_data.database_password | mask_sensitive(visible_chars=6) }}
tags: [debug, security]
- name: Validate and extract email domain
debug:
msg: |
Email: {{ user_data.email }}
Valid: {{ user_data.email | validate_email }}
Domain: {{ user_data.email | extract_domain }}
tags: [debug, email]
- name: Format file sizes
debug:
msg: "File {{ item }} bytes = {{ item | format_bytes }}"
loop: "{{ file_sizes }}"
tags: [debug, files]
- name: Show time ago
debug:
msg: "User created: {{ user_data.created_at | time_ago }}"
tags: [debug, time]
- name: Generate configuration with nice JSON
copy:
content: "{{ nested_config | to_nice_json }}"
dest: "/tmp/config.json"
mode: '0644'
tags: [config]
- name: Create safe filename
debug:
msg: "Safe filename: {{ 'My File (2023-01-15) [FINAL].txt' | safe_filename }}"
tags: [debug, filename]
- name: Generate random password
debug:
msg: "Generated password: {{ '' | generate_password(16, true) }}"
tags: [debug, password]
no_log: true # 不记录密码到日志
9.6 本章总结
本章介绍了 Ansible 的高级特性和技巧:
- Vault 加密管理:保护敏感数据的安全性
- 动态包含和导入:提高 Playbook 的灵活性和可维护性
- 错误处理和调试:增强 Playbook 的健壮性和可调试性
- 性能优化:提高 Ansible 执行效率
- 自定义模块和插件:扩展 Ansible 功能
这些高级特性使 Ansible 能够应对复杂的自动化场景,提供企业级的解决方案。
9.7 练习题
基础练习
Vault 使用
- 创建加密的变量文件
- 在 Playbook 中使用加密变量
- 实现多环境的密钥管理
动态包含
- 实现基于操作系统的动态任务包含
- 创建条件性的角色包含
- 设计灵活的 Playbook 结构
进阶练习
错误处理
- 实现复杂的错误处理逻辑
- 设计回滚机制
- 添加详细的日志和审计功能
性能优化
- 优化大规模部署的性能
- 实现智能的批处理策略
- 分析和解决性能瓶颈
实战练习
自定义模块开发
- 开发特定业务需求的自定义模块
- 实现完整的错误处理和文档
- 编写模块测试用例
企业级解决方案
- 设计完整的企业级自动化方案
- 集成监控、日志和安全功能
- 实现 CI/CD 流水线集成
下一章:第10章:实战项目案例
返回目录:Ansible 自动化运维教程