7.1 Inventory 概述
7.1.1 什么是 Inventory
Inventory(主机清单)是 Ansible 的核心组件之一,它定义了 Ansible 可以管理的主机和主机组。Inventory 不仅包含主机列表,还可以包含变量、连接参数和主机组织结构。
Inventory 的作用: - 定义目标主机和主机组 - 存储主机特定的变量和连接参数 - 组织主机的层次结构 - 支持动态主机发现 - 提供主机过滤和选择机制
7.1.2 Inventory 类型
# 静态 Inventory
# 手动定义的主机列表,通常存储在文件中
# 动态 Inventory
# 通过脚本或插件自动发现主机
# 混合 Inventory
# 结合静态和动态 Inventory
7.2 静态 Inventory
7.2.1 INI 格式 Inventory
# inventory/hosts.ini
# 基本主机定义
[webservers]
web1.example.com
web2.example.com
web3.example.com
[databases]
db1.example.com
db2.example.com
[loadbalancers]
lb1.example.com
lb2.example.com
# 使用 IP 地址
[cache_servers]
192.168.1.10
192.168.1.11
192.168.1.12
# 指定端口和连接参数
[app_servers]
app1.example.com ansible_port=2222 ansible_user=deploy
app2.example.com ansible_port=2222 ansible_user=deploy
app3.example.com ansible_port=2222 ansible_user=deploy
# 使用主机范围
[web_cluster]
web[01:10].example.com
[db_cluster]
db[a:f].example.com
# 主机别名
[monitoring]
jumpbox ansible_host=jump.example.com ansible_user=admin
logserver ansible_host=logs.example.com ansible_port=2222
# 主机组的组(父子关系)
[production:children]
webservers
databases
loadbalancers
[staging:children]
staging_web
staging_db
[staging_web]
staging-web1.example.com
staging-web2.example.com
[staging_db]
staging-db1.example.com
# 组变量
[webservers:vars]
http_port=80
https_port=443
nginx_worker_processes=auto
[databases:vars]
mysql_port=3306
mysql_root_password=secret
max_connections=200
[production:vars]
environment=production
log_level=warning
backup_enabled=true
[staging:vars]
environment=staging
log_level=debug
backup_enabled=false
7.2.2 YAML 格式 Inventory
# inventory/hosts.yml
all:
children:
production:
children:
webservers:
hosts:
web1.example.com:
ansible_host: 10.0.1.10
server_role: primary
web2.example.com:
ansible_host: 10.0.1.11
server_role: secondary
web3.example.com:
ansible_host: 10.0.1.12
server_role: secondary
vars:
http_port: 80
https_port: 443
nginx_worker_processes: auto
environment: production
databases:
hosts:
db1.example.com:
ansible_host: 10.0.2.10
mysql_server_id: 1
mysql_role: master
db2.example.com:
ansible_host: 10.0.2.11
mysql_server_id: 2
mysql_role: slave
vars:
mysql_port: 3306
mysql_root_password: "{{ vault_mysql_root_password }}"
max_connections: 200
innodb_buffer_pool_size: 1G
loadbalancers:
hosts:
lb1.example.com:
ansible_host: 10.0.3.10
priority: 100
lb2.example.com:
ansible_host: 10.0.3.11
priority: 90
vars:
keepalived_interface: eth0
virtual_ip: 10.0.3.100
vars:
environment: production
log_level: warning
backup_enabled: true
monitoring_enabled: true
staging:
children:
staging_web:
hosts:
staging-web1.example.com:
ansible_host: 10.0.10.10
staging-web2.example.com:
ansible_host: 10.0.10.11
vars:
http_port: 8080
https_port: 8443
staging_db:
hosts:
staging-db1.example.com:
ansible_host: 10.0.11.10
vars:
mysql_port: 3306
vars:
environment: staging
log_level: debug
backup_enabled: false
monitoring_enabled: false
development:
hosts:
dev1.example.com:
ansible_host: 192.168.1.100
ansible_user: developer
dev2.example.com:
ansible_host: 192.168.1.101
ansible_user: developer
vars:
environment: development
log_level: debug
backup_enabled: false
# 特殊用途主机
monitoring:
hosts:
prometheus.example.com:
ansible_host: 10.0.4.10
prometheus_port: 9090
grafana.example.com:
ansible_host: 10.0.4.11
grafana_port: 3000
elk.example.com:
ansible_host: 10.0.4.12
elasticsearch_port: 9200
kibana_port: 5601
vars:
monitoring_retention_days: 30
alert_email: admin@example.com
7.2.3 主机连接参数
# inventory/connection-params.ini
[webservers]
# SSH 连接参数
web1.example.com ansible_host=10.0.1.10 ansible_port=22 ansible_user=ubuntu
web2.example.com ansible_host=10.0.1.11 ansible_port=2222 ansible_user=centos
# SSH 密钥认证
web3.example.com ansible_host=10.0.1.12 ansible_ssh_private_key_file=~/.ssh/web3_key
# SSH 密码认证(不推荐)
web4.example.com ansible_host=10.0.1.13 ansible_ssh_pass=password123
# 使用跳板机
web5.example.com ansible_host=10.0.1.14 ansible_ssh_common_args='-o ProxyCommand="ssh -W %h:%p jump.example.com"'
# Python 解释器路径
web6.example.com ansible_host=10.0.1.15 ansible_python_interpreter=/usr/bin/python3
[windows_servers]
# Windows 主机连接
win1.example.com ansible_host=10.0.2.10 ansible_user=Administrator ansible_password=WinPass123 ansible_connection=winrm ansible_winrm_transport=basic
win2.example.com ansible_host=10.0.2.11 ansible_user=Administrator ansible_connection=winrm ansible_winrm_server_cert_validation=ignore
[docker_containers]
# Docker 容器连接
app_container ansible_connection=docker ansible_docker_extra_args="-H=tcp://docker.example.com:2376"
db_container ansible_connection=docker ansible_user=root
[local_tasks]
# 本地执行
localhost ansible_connection=local ansible_python_interpreter=/usr/bin/python3
7.3 动态 Inventory
7.3.1 动态 Inventory 脚本
#!/usr/bin/env python3
# inventory/dynamic_inventory.py
import json
import argparse
import requests
import sys
from typing import Dict, List, Any
class DynamicInventory:
def __init__(self):
self.inventory = {
'_meta': {
'hostvars': {}
}
}
self.read_cli_args()
# 根据参数执行相应操作
if self.args.list:
self.inventory = self.get_inventory()
elif self.args.host:
self.inventory = self.get_host_vars(self.args.host)
else:
self.inventory = self.empty_inventory()
print(json.dumps(self.inventory, indent=2))
def read_cli_args(self):
parser = argparse.ArgumentParser()
parser.add_argument('--list', action='store_true')
parser.add_argument('--host', action='store')
self.args = parser.parse_args()
def get_inventory(self) -> Dict[str, Any]:
"""获取完整的主机清单"""
inventory = {
'_meta': {
'hostvars': {}
}
}
# 从 CMDB 或云平台 API 获取主机信息
hosts_data = self.fetch_hosts_from_api()
# 按角色分组主机
for host_info in hosts_data:
hostname = host_info['hostname']
role = host_info.get('role', 'ungrouped')
environment = host_info.get('environment', 'unknown')
# 创建角色组
if role not in inventory:
inventory[role] = {
'hosts': [],
'vars': {}
}
inventory[role]['hosts'].append(hostname)
# 创建环境组
env_group = f"{environment}_{role}"
if env_group not in inventory:
inventory[env_group] = {
'hosts': [],
'vars': {
'environment': environment
}
}
inventory[env_group]['hosts'].append(hostname)
# 设置主机变量
inventory['_meta']['hostvars'][hostname] = {
'ansible_host': host_info.get('ip_address'),
'ansible_user': host_info.get('ssh_user', 'ubuntu'),
'server_role': role,
'environment': environment,
'instance_type': host_info.get('instance_type'),
'region': host_info.get('region'),
'tags': host_info.get('tags', {})
}
# 创建父组
self.create_parent_groups(inventory)
return inventory
def fetch_hosts_from_api(self) -> List[Dict[str, Any]]:
"""从 API 获取主机信息"""
# 示例:从 CMDB API 获取主机信息
try:
response = requests.get(
'https://cmdb.example.com/api/hosts',
headers={'Authorization': 'Bearer YOUR_API_TOKEN'},
timeout=30
)
response.raise_for_status()
return response.json()['hosts']
except requests.RequestException as e:
# 如果 API 不可用,返回静态数据
return self.get_fallback_hosts()
def get_fallback_hosts(self) -> List[Dict[str, Any]]:
"""API 不可用时的备用主机数据"""
return [
{
'hostname': 'web1.example.com',
'ip_address': '10.0.1.10',
'role': 'webserver',
'environment': 'production',
'instance_type': 't3.medium',
'region': 'us-east-1',
'ssh_user': 'ubuntu',
'tags': {'Team': 'DevOps', 'Project': 'WebApp'}
},
{
'hostname': 'web2.example.com',
'ip_address': '10.0.1.11',
'role': 'webserver',
'environment': 'production',
'instance_type': 't3.medium',
'region': 'us-east-1',
'ssh_user': 'ubuntu',
'tags': {'Team': 'DevOps', 'Project': 'WebApp'}
},
{
'hostname': 'db1.example.com',
'ip_address': '10.0.2.10',
'role': 'database',
'environment': 'production',
'instance_type': 't3.large',
'region': 'us-east-1',
'ssh_user': 'ubuntu',
'tags': {'Team': 'DBA', 'Project': 'WebApp'}
}
]
def create_parent_groups(self, inventory: Dict[str, Any]):
"""创建父组结构"""
# 创建环境父组
environments = set()
for group_name in inventory.keys():
if '_' in group_name and group_name != '_meta':
env = group_name.split('_')[0]
environments.add(env)
for env in environments:
env_groups = [g for g in inventory.keys() if g.startswith(f"{env}_")]
if env_groups:
inventory[env] = {
'children': env_groups,
'vars': {
'environment': env
}
}
def get_host_vars(self, hostname: str) -> Dict[str, Any]:
"""获取特定主机的变量"""
inventory = self.get_inventory()
return inventory['_meta']['hostvars'].get(hostname, {})
def empty_inventory(self) -> Dict[str, Any]:
"""返回空的主机清单"""
return {'_meta': {'hostvars': {}}}
if __name__ == '__main__':
DynamicInventory()
7.3.2 AWS EC2 动态 Inventory
#!/usr/bin/env python3
# inventory/aws_ec2_inventory.py
import boto3
import json
import argparse
from typing import Dict, List, Any
class EC2DynamicInventory:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.inventory = {
'_meta': {
'hostvars': {}
}
}
self.read_cli_args()
if self.args.list:
self.inventory = self.get_inventory()
elif self.args.host:
self.inventory = self.get_host_vars(self.args.host)
print(json.dumps(self.inventory, indent=2))
def read_cli_args(self):
parser = argparse.ArgumentParser()
parser.add_argument('--list', action='store_true')
parser.add_argument('--host', action='store')
self.args = parser.parse_args()
def get_inventory(self) -> Dict[str, Any]:
"""从 AWS EC2 获取主机清单"""
inventory = {
'_meta': {
'hostvars': {}
}
}
# 获取所有运行中的 EC2 实例
response = self.ec2.describe_instances(
Filters=[
{
'Name': 'instance-state-name',
'Values': ['running']
}
]
)
for reservation in response['Reservations']:
for instance in reservation['Instances']:
hostname = self.get_hostname(instance)
if not hostname:
continue
# 按标签分组
self.group_by_tags(inventory, instance, hostname)
# 按实例类型分组
instance_type = instance['InstanceType']
self.add_to_group(inventory, f"type_{instance_type}", hostname)
# 按可用区分组
az = instance['Placement']['AvailabilityZone']
self.add_to_group(inventory, f"az_{az}", hostname)
# 按 VPC 分组
vpc_id = instance.get('VpcId', 'no-vpc')
self.add_to_group(inventory, f"vpc_{vpc_id}", hostname)
# 设置主机变量
inventory['_meta']['hostvars'][hostname] = {
'ansible_host': instance.get('PublicIpAddress', instance.get('PrivateIpAddress')),
'ansible_user': self.get_ssh_user(instance),
'ec2_instance_id': instance['InstanceId'],
'ec2_instance_type': instance['InstanceType'],
'ec2_availability_zone': az,
'ec2_region': instance['Placement']['AvailabilityZone'][:-1],
'ec2_vpc_id': vpc_id,
'ec2_subnet_id': instance.get('SubnetId'),
'ec2_private_ip': instance.get('PrivateIpAddress'),
'ec2_public_ip': instance.get('PublicIpAddress'),
'ec2_state': instance['State']['Name'],
'ec2_tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
}
return inventory
def get_hostname(self, instance: Dict[str, Any]) -> str:
"""获取实例的主机名"""
# 优先使用 Name 标签
for tag in instance.get('Tags', []):
if tag['Key'] == 'Name':
return tag['Value']
# 使用实例 ID 作为备用
return instance['InstanceId']
def get_ssh_user(self, instance: Dict[str, Any]) -> str:
"""根据 AMI 确定 SSH 用户"""
# 获取 AMI 信息
try:
ami_response = self.ec2.describe_images(ImageIds=[instance['ImageId']])
if ami_response['Images']:
ami_name = ami_response['Images'][0].get('Name', '').lower()
if 'ubuntu' in ami_name:
return 'ubuntu'
elif 'centos' in ami_name:
return 'centos'
elif 'rhel' in ami_name or 'redhat' in ami_name:
return 'ec2-user'
elif 'amazon' in ami_name:
return 'ec2-user'
except Exception:
pass
return 'ec2-user' # 默认用户
def group_by_tags(self, inventory: Dict[str, Any], instance: Dict[str, Any], hostname: str):
"""根据标签分组"""
for tag in instance.get('Tags', []):
key = tag['Key']
value = tag['Value']
# 跳过 Name 标签
if key == 'Name':
continue
# 创建标签组
group_name = f"tag_{key}_{value}".replace(' ', '_').replace('-', '_').lower()
self.add_to_group(inventory, group_name, hostname)
# 创建标签键组
key_group = f"tag_{key}".replace(' ', '_').replace('-', '_').lower()
self.add_to_group(inventory, key_group, hostname)
def add_to_group(self, inventory: Dict[str, Any], group_name: str, hostname: str):
"""将主机添加到组"""
if group_name not in inventory:
inventory[group_name] = {
'hosts': [],
'vars': {}
}
if hostname not in inventory[group_name]['hosts']:
inventory[group_name]['hosts'].append(hostname)
def get_host_vars(self, hostname: str) -> Dict[str, Any]:
"""获取特定主机的变量"""
inventory = self.get_inventory()
return inventory['_meta']['hostvars'].get(hostname, {})
if __name__ == '__main__':
EC2DynamicInventory()
7.3.3 使用 Inventory 插件
# inventory/aws_ec2.yml
# AWS EC2 插件配置
plugin: amazon.aws.aws_ec2
regions:
- us-east-1
- us-west-2
filters:
instance-state-name: running
tag:Environment: [production, staging]
# 分组配置
keyed_groups:
# 按环境分组
- key: tags.Environment
prefix: env
# 按实例类型分组
- key: instance_type
prefix: type
# 按可用区分组
- key: placement.availability_zone
prefix: az
# 按 VPC 分组
- key: vpc_id
prefix: vpc
# 主机名配置
hostnames:
- tag:Name
- instance-id
# 变量组合
compose:
ansible_host: public_ip_address | default(private_ip_address)
ansible_user: >
'ubuntu' if (image.name | default('')) | regex_search('ubuntu', ignorecase=True)
else 'centos' if (image.name | default('')) | regex_search('centos', ignorecase=True)
else 'ec2-user'
ec2_instance_type: instance_type
ec2_region: placement.region
ec2_availability_zone: placement.availability_zone
ec2_vpc_id: vpc_id
ec2_subnet_id: subnet_id
ec2_private_ip: private_ip_address
ec2_public_ip: public_ip_address
ec2_state: state.name
# 缓存配置
cache: true
cache_plugin: memory
cache_timeout: 3600
cache_connection: /tmp/ansible-inventory-cache
# inventory/azure_rm.yml
# Azure 插件配置
plugin: azure.azcollection.azure_rm
auth_source: auto
include_vm_resource_groups:
- production-rg
- staging-rg
# 分组配置
keyed_groups:
# 按资源组分组
- key: resource_group
prefix: rg
# 按位置分组
- key: location
prefix: location
# 按 VM 大小分组
- key: vm_size
prefix: size
# 按标签分组
- key: tags.Environment
prefix: env
- key: tags.Role
prefix: role
# 主机名配置
hostnames:
- name
- public_ipv4_addresses
- private_ipv4_addresses
# 变量组合
compose:
ansible_host: public_ipv4_addresses[0] | default(private_ipv4_addresses[0])
ansible_user: >
'ubuntu' if (image.offer | default('')) | regex_search('ubuntu', ignorecase=True)
else 'centos' if (image.offer | default('')) | regex_search('centos', ignorecase=True)
else 'azureuser'
azure_vm_size: vm_size
azure_location: location
azure_resource_group: resource_group
azure_vm_id: vm_id
azure_power_state: power_state
7.4 Inventory 变量管理
7.4.1 变量优先级
# 变量优先级(从低到高):
# 1. all 组变量
# 2. 父组变量
# 3. 子组变量
# 4. 主机变量
# 5. 主机事实(facts)
# 6. 注册变量
# 7. set_fact 变量
# 8. 角色变量
# 9. 任务变量
# 10. 额外变量(-e 参数)
# inventory/group_vars/all.yml
# 全局变量(最低优先级)
common_packages:
- curl
- wget
- vim
- htop
ntp_servers:
- 0.pool.ntp.org
- 1.pool.ntp.org
log_retention_days: 30
backup_retention_days: 7
# 安全配置
ssh_port: 22
ssh_permit_root_login: false
ssh_password_authentication: false
# inventory/group_vars/production.yml
# 生产环境变量
environment: production
log_level: warning
monitoring_enabled: true
backup_enabled: true
# 生产环境特定配置
max_connections: 1000
worker_processes: auto
cache_ttl: 3600
# 安全配置
firewall_enabled: true
ssl_required: true
fail2ban_enabled: true
# 监控配置
prometheus_scrape_interval: 15s
alert_manager_enabled: true
log_shipping_enabled: true
# inventory/group_vars/webservers.yml
# Web 服务器组变量
http_port: 80
https_port: 443
nginx_worker_processes: auto
nginx_worker_connections: 1024
# SSL 配置
ssl_certificate_path: /etc/ssl/certs
ssl_private_key_path: /etc/ssl/private
ssl_protocols:
- TLSv1.2
- TLSv1.3
# 缓存配置
nginx_proxy_cache_path: /var/cache/nginx
nginx_proxy_cache_levels: "1:2"
nginx_proxy_cache_max_size: 1g
# 日志配置
nginx_access_log: /var/log/nginx/access.log
nginx_error_log: /var/log/nginx/error.log
log_format: combined
# inventory/host_vars/web1.example.com.yml
# 主机特定变量(高优先级)
server_role: primary
server_weight: 100
# 主机特定配置
nginx_worker_processes: 4
max_upload_size: 100M
# 备份配置
backup_schedule: "0 2 * * *"
backup_retention_days: 14
# 监控配置
monitoring_checks:
- http_response
- ssl_certificate
- disk_usage
- memory_usage
# 主机特定的服务配置
services:
nginx:
enabled: true
state: started
php-fpm:
enabled: true
state: started
redis:
enabled: false
state: stopped
7.4.2 变量文件组织
# 推荐的变量文件结构
inventory/
├── hosts.yml # 主 Inventory 文件
├── group_vars/
│ ├── all.yml # 全局变量
│ ├── all/
│ │ ├── common.yml # 通用配置
│ │ ├── security.yml # 安全配置
│ │ └── monitoring.yml # 监控配置
│ ├── production.yml # 生产环境变量
│ ├── staging.yml # 测试环境变量
│ ├── webservers.yml # Web 服务器组变量
│ ├── webservers/
│ │ ├── nginx.yml # Nginx 配置
│ │ ├── ssl.yml # SSL 配置
│ │ └── performance.yml # 性能配置
│ ├── databases.yml # 数据库组变量
│ ├── databases/
│ │ ├── mysql.yml # MySQL 配置
│ │ ├── backup.yml # 备份配置
│ │ └── replication.yml # 复制配置
│ └── loadbalancers.yml # 负载均衡器组变量
├── host_vars/
│ ├── web1.example.com.yml # 主机特定变量
│ ├── web2.example.com.yml
│ ├── db1.example.com.yml
│ └── lb1.example.com.yml
└── vault/
├── production.yml # 生产环境密钥
├── staging.yml # 测试环境密钥
└── common.yml # 通用密钥
# inventory/group_vars/all/common.yml
# 通用配置变量
timezone: UTC
locale: en_US.UTF-8
# 包管理
package_update_cache: true
package_upgrade: false
# 用户管理
admin_users:
- name: admin
shell: /bin/bash
groups: [sudo, wheel]
ssh_keys:
- "ssh-rsa AAAAB3NzaC1yc2E... admin@example.com"
# 网络配置
dns_servers:
- 8.8.8.8
- 8.8.4.4
- 1.1.1.1
# 日志配置
log_rotation:
frequency: daily
retention: 30
compress: true
# 临时目录
temp_dir: /tmp
work_dir: /opt/ansible-work
# inventory/group_vars/all/security.yml
# 安全配置变量
# SSH 安全配置
ssh_config:
port: 22
permit_root_login: false
password_authentication: false
pubkey_authentication: true
challenge_response_authentication: false
use_pam: true
x11_forwarding: false
max_auth_tries: 3
client_alive_interval: 300
client_alive_count_max: 2
# 防火墙配置
firewall:
enabled: true
default_policy: deny
allowed_ports:
- 22/tcp # SSH
- 80/tcp # HTTP
- 443/tcp # HTTPS
# 系统安全
system_security:
disable_unused_services: true
enable_selinux: true
kernel_hardening: true
file_permissions_check: true
# 密码策略
password_policy:
min_length: 12
require_uppercase: true
require_lowercase: true
require_numbers: true
require_special_chars: true
max_age: 90
min_age: 1
history: 12
7.5 Inventory 高级功能
7.5.1 主机模式和过滤
# 基本主机选择
ansible all -m ping # 所有主机
ansible webservers -m ping # webservers 组
ansible web1.example.com -m ping # 特定主机
# 模式匹配
ansible 'web*' -m ping # 通配符匹配
ansible 'web[1-3]' -m ping # 范围匹配
ansible '~web\d+' -m ping # 正则表达式匹配
# 组合模式
ansible 'webservers:databases' -m ping # 并集(OR)
ansible 'webservers:&production' -m ping # 交集(AND)
ansible 'webservers:!staging' -m ping # 差集(NOT)
# 复杂模式
ansible 'webservers:&production:!web3*' -m ping # 生产环境的 web 服务器,排除 web3*
ansible '(webservers:databases):&production' -m ping # 生产环境的 web 或数据库服务器
# 基于变量过滤
ansible all -m ping --limit 'ansible_os_family=="RedHat"' # RedHat 系列系统
ansible all -m ping --limit 'environment=="production"' # 生产环境主机
7.5.2 Inventory 脚本高级示例
#!/usr/bin/env python3
# inventory/advanced_inventory.py
import json
import argparse
import yaml
import os
import subprocess
from typing import Dict, List, Any, Optional
from pathlib import Path
class AdvancedInventory:
def __init__(self):
self.config_file = os.environ.get('INVENTORY_CONFIG', 'inventory_config.yml')
self.cache_dir = os.environ.get('INVENTORY_CACHE_DIR', '/tmp/ansible-inventory-cache')
self.cache_timeout = int(os.environ.get('INVENTORY_CACHE_TIMEOUT', '3600'))
self.load_config()
self.read_cli_args()
if self.args.list:
self.inventory = self.get_inventory()
elif self.args.host:
self.inventory = self.get_host_vars(self.args.host)
else:
self.inventory = self.empty_inventory()
print(json.dumps(self.inventory, indent=2))
def load_config(self):
"""加载配置文件"""
try:
with open(self.config_file, 'r') as f:
self.config = yaml.safe_load(f)
except FileNotFoundError:
self.config = self.get_default_config()
def get_default_config(self) -> Dict[str, Any]:
"""默认配置"""
return {
'sources': [
{
'type': 'static',
'file': 'static_hosts.yml'
},
{
'type': 'aws_ec2',
'regions': ['us-east-1', 'us-west-2'],
'filters': {
'instance-state-name': 'running'
}
},
{
'type': 'consul',
'url': 'http://consul.example.com:8500',
'datacenter': 'dc1'
}
],
'grouping': {
'by_environment': True,
'by_role': True,
'by_region': True
},
'variables': {
'global': {
'ansible_user': 'ubuntu',
'ansible_ssh_private_key_file': '~/.ssh/id_rsa'
}
}
}
def read_cli_args(self):
parser = argparse.ArgumentParser()
parser.add_argument('--list', action='store_true')
parser.add_argument('--host', action='store')
parser.add_argument('--refresh-cache', action='store_true')
self.args = parser.parse_args()
def get_inventory(self) -> Dict[str, Any]:
"""获取完整的主机清单"""
# 检查缓存
if not self.args.refresh_cache:
cached_inventory = self.load_from_cache()
if cached_inventory:
return cached_inventory
inventory = {
'_meta': {
'hostvars': {}
}
}
# 从多个源收集主机信息
all_hosts = []
for source in self.config.get('sources', []):
hosts = self.get_hosts_from_source(source)
all_hosts.extend(hosts)
# 处理主机信息
for host_info in all_hosts:
hostname = host_info['hostname']
# 应用分组规则
self.apply_grouping_rules(inventory, host_info)
# 设置主机变量
host_vars = self.build_host_vars(host_info)
inventory['_meta']['hostvars'][hostname] = host_vars
# 应用全局变量
self.apply_global_variables(inventory)
# 缓存结果
self.save_to_cache(inventory)
return inventory
def get_hosts_from_source(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
"""从指定源获取主机信息"""
source_type = source['type']
if source_type == 'static':
return self.get_static_hosts(source)
elif source_type == 'aws_ec2':
return self.get_aws_ec2_hosts(source)
elif source_type == 'consul':
return self.get_consul_hosts(source)
elif source_type == 'script':
return self.get_script_hosts(source)
else:
return []
def get_static_hosts(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
"""从静态文件获取主机"""
try:
with open(source['file'], 'r') as f:
data = yaml.safe_load(f)
hosts = []
for hostname, host_data in data.get('hosts', {}).items():
host_info = {
'hostname': hostname,
'source': 'static',
**host_data
}
hosts.append(host_info)
return hosts
except FileNotFoundError:
return []
def get_aws_ec2_hosts(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
"""从 AWS EC2 获取主机"""
try:
import boto3
hosts = []
for region in source.get('regions', ['us-east-1']):
ec2 = boto3.client('ec2', region_name=region)
filters = []
for key, value in source.get('filters', {}).items():
if isinstance(value, list):
filters.append({'Name': key, 'Values': value})
else:
filters.append({'Name': key, 'Values': [value]})
response = ec2.describe_instances(Filters=filters)
for reservation in response['Reservations']:
for instance in reservation['Instances']:
hostname = self.get_ec2_hostname(instance)
if hostname:
host_info = {
'hostname': hostname,
'source': 'aws_ec2',
'ip_address': instance.get('PublicIpAddress', instance.get('PrivateIpAddress')),
'private_ip': instance.get('PrivateIpAddress'),
'public_ip': instance.get('PublicIpAddress'),
'instance_id': instance['InstanceId'],
'instance_type': instance['InstanceType'],
'region': region,
'availability_zone': instance['Placement']['AvailabilityZone'],
'vpc_id': instance.get('VpcId'),
'subnet_id': instance.get('SubnetId'),
'tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
}
hosts.append(host_info)
return hosts
except ImportError:
print("Warning: boto3 not installed, skipping AWS EC2 source")
return []
except Exception as e:
print(f"Warning: Failed to get AWS EC2 hosts: {e}")
return []
def get_consul_hosts(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
"""从 Consul 获取主机"""
try:
import requests
url = source['url']
datacenter = source.get('datacenter', 'dc1')
# 获取所有服务
services_url = f"{url}/v1/catalog/services?dc={datacenter}"
response = requests.get(services_url, timeout=30)
response.raise_for_status()
services = response.json()
hosts = []
for service_name in services.keys():
# 获取服务实例
service_url = f"{url}/v1/catalog/service/{service_name}?dc={datacenter}"
response = requests.get(service_url, timeout=30)
response.raise_for_status()
instances = response.json()
for instance in instances:
hostname = instance['Node']
host_info = {
'hostname': hostname,
'source': 'consul',
'ip_address': instance['Address'],
'service_name': service_name,
'service_port': instance['ServicePort'],
'datacenter': datacenter,
'tags': instance.get('ServiceTags', [])
}
hosts.append(host_info)
return hosts
except ImportError:
print("Warning: requests not installed, skipping Consul source")
return []
except Exception as e:
print(f"Warning: Failed to get Consul hosts: {e}")
return []
def get_script_hosts(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
"""从脚本获取主机"""
try:
script_path = source['script']
result = subprocess.run(
[script_path, '--list'],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
data = json.loads(result.stdout)
hosts = []
for hostname, host_vars in data.get('_meta', {}).get('hostvars', {}).items():
host_info = {
'hostname': hostname,
'source': 'script',
**host_vars
}
hosts.append(host_info)
return hosts
else:
print(f"Warning: Script {script_path} failed: {result.stderr}")
return []
except Exception as e:
print(f"Warning: Failed to run script {source['script']}: {e}")
return []
def get_ec2_hostname(self, instance: Dict[str, Any]) -> Optional[str]:
"""获取 EC2 实例的主机名"""
# 优先使用 Name 标签
for tag in instance.get('Tags', []):
if tag['Key'] == 'Name':
return tag['Value']
# 使用实例 ID 作为备用
return instance['InstanceId']
def apply_grouping_rules(self, inventory: Dict[str, Any], host_info: Dict[str, Any]):
"""应用分组规则"""
hostname = host_info['hostname']
grouping_config = self.config.get('grouping', {})
# 按环境分组
if grouping_config.get('by_environment'):
environment = host_info.get('environment') or host_info.get('tags', {}).get('Environment')
if environment:
self.add_to_group(inventory, f"env_{environment}", hostname)
# 按角色分组
if grouping_config.get('by_role'):
role = host_info.get('role') or host_info.get('tags', {}).get('Role')
if role:
self.add_to_group(inventory, f"role_{role}", hostname)
# 按区域分组
if grouping_config.get('by_region'):
region = host_info.get('region') or host_info.get('availability_zone', '')[:-1]
if region:
self.add_to_group(inventory, f"region_{region}", hostname)
# 按源分组
source = host_info.get('source')
if source:
self.add_to_group(inventory, f"source_{source}", hostname)
def add_to_group(self, inventory: Dict[str, Any], group_name: str, hostname: str):
"""将主机添加到组"""
if group_name not in inventory:
inventory[group_name] = {
'hosts': [],
'vars': {}
}
if hostname not in inventory[group_name]['hosts']:
inventory[group_name]['hosts'].append(hostname)
def build_host_vars(self, host_info: Dict[str, Any]) -> Dict[str, Any]:
"""构建主机变量"""
host_vars = {
'ansible_host': host_info.get('ip_address'),
'inventory_source': host_info.get('source')
}
# 添加源特定的变量
source = host_info.get('source')
if source == 'aws_ec2':
host_vars.update({
'ec2_instance_id': host_info.get('instance_id'),
'ec2_instance_type': host_info.get('instance_type'),
'ec2_region': host_info.get('region'),
'ec2_availability_zone': host_info.get('availability_zone'),
'ec2_vpc_id': host_info.get('vpc_id'),
'ec2_subnet_id': host_info.get('subnet_id'),
'ec2_private_ip': host_info.get('private_ip'),
'ec2_public_ip': host_info.get('public_ip'),
'ec2_tags': host_info.get('tags', {})
})
elif source == 'consul':
host_vars.update({
'consul_service_name': host_info.get('service_name'),
'consul_service_port': host_info.get('service_port'),
'consul_datacenter': host_info.get('datacenter'),
'consul_tags': host_info.get('tags', [])
})
# 添加其他主机信息
for key, value in host_info.items():
if key not in ['hostname', 'source', 'ip_address']:
host_vars[key] = value
return host_vars
def apply_global_variables(self, inventory: Dict[str, Any]):
"""应用全局变量"""
global_vars = self.config.get('variables', {}).get('global', {})
# 为所有主机添加全局变量
for hostname in inventory['_meta']['hostvars']:
for key, value in global_vars.items():
if key not in inventory['_meta']['hostvars'][hostname]:
inventory['_meta']['hostvars'][hostname][key] = value
def load_from_cache(self) -> Optional[Dict[str, Any]]:
"""从缓存加载"""
cache_file = Path(self.cache_dir) / 'inventory.json'
if cache_file.exists():
cache_age = cache_file.stat().st_mtime
if (cache_age + self.cache_timeout) > time.time():
try:
with open(cache_file, 'r') as f:
return json.load(f)
except Exception:
pass
return None
def save_to_cache(self, inventory: Dict[str, Any]):
"""保存到缓存"""
cache_dir = Path(self.cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / 'inventory.json'
try:
with open(cache_file, 'w') as f:
json.dump(inventory, f, indent=2)
except Exception as e:
print(f"Warning: Failed to save cache: {e}")
def get_host_vars(self, hostname: str) -> Dict[str, Any]:
"""获取特定主机的变量"""
inventory = self.get_inventory()
return inventory['_meta']['hostvars'].get(hostname, {})
def empty_inventory(self) -> Dict[str, Any]:
"""返回空的主机清单"""
return {'_meta': {'hostvars': {}}}
if __name__ == '__main__':
import time
AdvancedInventory()
7.6 本章总结
本章详细介绍了 Ansible Inventory 管理的各个方面:
- Inventory 概述:理解 Inventory 的作用和类型
- 静态 Inventory:INI 和 YAML 格式的静态主机清单
- 动态 Inventory:脚本和插件实现的动态主机发现
- 变量管理:变量优先级和文件组织结构
- 高级功能:主机过滤、模式匹配和复杂的 Inventory 脚本
掌握 Inventory 管理是有效使用 Ansible 的基础,它决定了自动化任务的执行范围和配置管理的灵活性。
7.7 练习题
基础练习
静态 Inventory 创建
- 创建包含多个环境的 INI 格式 Inventory
- 转换为 YAML 格式并添加主机变量
- 设置主机连接参数和组变量
变量管理
- 设计合理的变量文件结构
- 实现变量的层次化管理
- 测试变量优先级
进阶练习
动态 Inventory
- 编写自定义动态 Inventory 脚本
- 集成云平台 API
- 实现缓存机制
主机过滤
- 练习复杂的主机模式匹配
- 基于变量进行主机过滤
- 组合多种过滤条件
实战练习
- 多源 Inventory
- 结合静态和动态 Inventory
- 实现多云环境的主机管理
- 设计容错和备用机制
下一章:第8章:Roles 角色详解
返回目录:Ansible 自动化运维教程