7.1 Inventory 概述

7.1.1 什么是 Inventory

Inventory(主机清单)是 Ansible 的核心组件之一,它定义了 Ansible 可以管理的主机和主机组。Inventory 不仅包含主机列表,还可以包含变量、连接参数和主机组织结构。

Inventory 的作用: - 定义目标主机和主机组 - 存储主机特定的变量和连接参数 - 组织主机的层次结构 - 支持动态主机发现 - 提供主机过滤和选择机制

7.1.2 Inventory 类型

# 静态 Inventory
# 手动定义的主机列表,通常存储在文件中

# 动态 Inventory
# 通过脚本或插件自动发现主机

# 混合 Inventory
# 结合静态和动态 Inventory

7.2 静态 Inventory

7.2.1 INI 格式 Inventory

# inventory/hosts.ini
# 基本主机定义
[webservers]
web1.example.com
web2.example.com
web3.example.com

[databases]
db1.example.com
db2.example.com

[loadbalancers]
lb1.example.com
lb2.example.com

# 使用 IP 地址
[cache_servers]
192.168.1.10
192.168.1.11
192.168.1.12

# 指定端口和连接参数
[app_servers]
app1.example.com ansible_port=2222 ansible_user=deploy
app2.example.com ansible_port=2222 ansible_user=deploy
app3.example.com ansible_port=2222 ansible_user=deploy

# 使用主机范围
[web_cluster]
web[01:10].example.com

[db_cluster]
db[a:f].example.com

# 主机别名
[monitoring]
jumpbox ansible_host=jump.example.com ansible_user=admin
logserver ansible_host=logs.example.com ansible_port=2222

# 主机组的组(父子关系)
[production:children]
webservers
databases
loadbalancers

[staging:children]
staging_web
staging_db

[staging_web]
staging-web1.example.com
staging-web2.example.com

[staging_db]
staging-db1.example.com

# 组变量
[webservers:vars]
http_port=80
https_port=443
nginx_worker_processes=auto

[databases:vars]
mysql_port=3306
mysql_root_password=secret
max_connections=200

[production:vars]
environment=production
log_level=warning
backup_enabled=true

[staging:vars]
environment=staging
log_level=debug
backup_enabled=false

7.2.2 YAML 格式 Inventory

# inventory/hosts.yml
all:
  children:
    production:
      children:
        webservers:
          hosts:
            web1.example.com:
              ansible_host: 10.0.1.10
              server_role: primary
            web2.example.com:
              ansible_host: 10.0.1.11
              server_role: secondary
            web3.example.com:
              ansible_host: 10.0.1.12
              server_role: secondary
          vars:
            http_port: 80
            https_port: 443
            nginx_worker_processes: auto
            environment: production
        
        databases:
          hosts:
            db1.example.com:
              ansible_host: 10.0.2.10
              mysql_server_id: 1
              mysql_role: master
            db2.example.com:
              ansible_host: 10.0.2.11
              mysql_server_id: 2
              mysql_role: slave
          vars:
            mysql_port: 3306
            mysql_root_password: "{{ vault_mysql_root_password }}"
            max_connections: 200
            innodb_buffer_pool_size: 1G
        
        loadbalancers:
          hosts:
            lb1.example.com:
              ansible_host: 10.0.3.10
              priority: 100
            lb2.example.com:
              ansible_host: 10.0.3.11
              priority: 90
          vars:
            keepalived_interface: eth0
            virtual_ip: 10.0.3.100
      
      vars:
        environment: production
        log_level: warning
        backup_enabled: true
        monitoring_enabled: true
    
    staging:
      children:
        staging_web:
          hosts:
            staging-web1.example.com:
              ansible_host: 10.0.10.10
            staging-web2.example.com:
              ansible_host: 10.0.10.11
          vars:
            http_port: 8080
            https_port: 8443
        
        staging_db:
          hosts:
            staging-db1.example.com:
              ansible_host: 10.0.11.10
          vars:
            mysql_port: 3306
      
      vars:
        environment: staging
        log_level: debug
        backup_enabled: false
        monitoring_enabled: false
    
    development:
      hosts:
        dev1.example.com:
          ansible_host: 192.168.1.100
          ansible_user: developer
        dev2.example.com:
          ansible_host: 192.168.1.101
          ansible_user: developer
      vars:
        environment: development
        log_level: debug
        backup_enabled: false
    
    # 特殊用途主机
    monitoring:
      hosts:
        prometheus.example.com:
          ansible_host: 10.0.4.10
          prometheus_port: 9090
        grafana.example.com:
          ansible_host: 10.0.4.11
          grafana_port: 3000
        elk.example.com:
          ansible_host: 10.0.4.12
          elasticsearch_port: 9200
          kibana_port: 5601
      vars:
        monitoring_retention_days: 30
        alert_email: admin@example.com

7.2.3 主机连接参数

# inventory/connection-params.ini
[webservers]
# SSH 连接参数
web1.example.com ansible_host=10.0.1.10 ansible_port=22 ansible_user=ubuntu
web2.example.com ansible_host=10.0.1.11 ansible_port=2222 ansible_user=centos

# SSH 密钥认证
web3.example.com ansible_host=10.0.1.12 ansible_ssh_private_key_file=~/.ssh/web3_key

# SSH 密码认证(不推荐)
web4.example.com ansible_host=10.0.1.13 ansible_ssh_pass=password123

# 使用跳板机
web5.example.com ansible_host=10.0.1.14 ansible_ssh_common_args='-o ProxyCommand="ssh -W %h:%p jump.example.com"'

# Python 解释器路径
web6.example.com ansible_host=10.0.1.15 ansible_python_interpreter=/usr/bin/python3

[windows_servers]
# Windows 主机连接
win1.example.com ansible_host=10.0.2.10 ansible_user=Administrator ansible_password=WinPass123 ansible_connection=winrm ansible_winrm_transport=basic
win2.example.com ansible_host=10.0.2.11 ansible_user=Administrator ansible_connection=winrm ansible_winrm_server_cert_validation=ignore

[docker_containers]
# Docker 容器连接
app_container ansible_connection=docker ansible_docker_extra_args="-H=tcp://docker.example.com:2376"
db_container ansible_connection=docker ansible_user=root

[local_tasks]
# 本地执行
localhost ansible_connection=local ansible_python_interpreter=/usr/bin/python3

7.3 动态 Inventory

7.3.1 动态 Inventory 脚本

#!/usr/bin/env python3
# inventory/dynamic_inventory.py

import json
import argparse
import requests
import sys
from typing import Dict, List, Any

class DynamicInventory:
    def __init__(self):
        self.inventory = {
            '_meta': {
                'hostvars': {}
            }
        }
        self.read_cli_args()
        
        # 根据参数执行相应操作
        if self.args.list:
            self.inventory = self.get_inventory()
        elif self.args.host:
            self.inventory = self.get_host_vars(self.args.host)
        else:
            self.inventory = self.empty_inventory()
        
        print(json.dumps(self.inventory, indent=2))
    
    def read_cli_args(self):
        parser = argparse.ArgumentParser()
        parser.add_argument('--list', action='store_true')
        parser.add_argument('--host', action='store')
        self.args = parser.parse_args()
    
    def get_inventory(self) -> Dict[str, Any]:
        """获取完整的主机清单"""
        inventory = {
            '_meta': {
                'hostvars': {}
            }
        }
        
        # 从 CMDB 或云平台 API 获取主机信息
        hosts_data = self.fetch_hosts_from_api()
        
        # 按角色分组主机
        for host_info in hosts_data:
            hostname = host_info['hostname']
            role = host_info.get('role', 'ungrouped')
            environment = host_info.get('environment', 'unknown')
            
            # 创建角色组
            if role not in inventory:
                inventory[role] = {
                    'hosts': [],
                    'vars': {}
                }
            inventory[role]['hosts'].append(hostname)
            
            # 创建环境组
            env_group = f"{environment}_{role}"
            if env_group not in inventory:
                inventory[env_group] = {
                    'hosts': [],
                    'vars': {
                        'environment': environment
                    }
                }
            inventory[env_group]['hosts'].append(hostname)
            
            # 设置主机变量
            inventory['_meta']['hostvars'][hostname] = {
                'ansible_host': host_info.get('ip_address'),
                'ansible_user': host_info.get('ssh_user', 'ubuntu'),
                'server_role': role,
                'environment': environment,
                'instance_type': host_info.get('instance_type'),
                'region': host_info.get('region'),
                'tags': host_info.get('tags', {})
            }
        
        # 创建父组
        self.create_parent_groups(inventory)
        
        return inventory
    
    def fetch_hosts_from_api(self) -> List[Dict[str, Any]]:
        """从 API 获取主机信息"""
        # 示例:从 CMDB API 获取主机信息
        try:
            response = requests.get(
                'https://cmdb.example.com/api/hosts',
                headers={'Authorization': 'Bearer YOUR_API_TOKEN'},
                timeout=30
            )
            response.raise_for_status()
            return response.json()['hosts']
        except requests.RequestException as e:
            # 如果 API 不可用,返回静态数据
            return self.get_fallback_hosts()
    
    def get_fallback_hosts(self) -> List[Dict[str, Any]]:
        """API 不可用时的备用主机数据"""
        return [
            {
                'hostname': 'web1.example.com',
                'ip_address': '10.0.1.10',
                'role': 'webserver',
                'environment': 'production',
                'instance_type': 't3.medium',
                'region': 'us-east-1',
                'ssh_user': 'ubuntu',
                'tags': {'Team': 'DevOps', 'Project': 'WebApp'}
            },
            {
                'hostname': 'web2.example.com',
                'ip_address': '10.0.1.11',
                'role': 'webserver',
                'environment': 'production',
                'instance_type': 't3.medium',
                'region': 'us-east-1',
                'ssh_user': 'ubuntu',
                'tags': {'Team': 'DevOps', 'Project': 'WebApp'}
            },
            {
                'hostname': 'db1.example.com',
                'ip_address': '10.0.2.10',
                'role': 'database',
                'environment': 'production',
                'instance_type': 't3.large',
                'region': 'us-east-1',
                'ssh_user': 'ubuntu',
                'tags': {'Team': 'DBA', 'Project': 'WebApp'}
            }
        ]
    
    def create_parent_groups(self, inventory: Dict[str, Any]):
        """创建父组结构"""
        # 创建环境父组
        environments = set()
        for group_name in inventory.keys():
            if '_' in group_name and group_name != '_meta':
                env = group_name.split('_')[0]
                environments.add(env)
        
        for env in environments:
            env_groups = [g for g in inventory.keys() if g.startswith(f"{env}_")]
            if env_groups:
                inventory[env] = {
                    'children': env_groups,
                    'vars': {
                        'environment': env
                    }
                }
    
    def get_host_vars(self, hostname: str) -> Dict[str, Any]:
        """获取特定主机的变量"""
        inventory = self.get_inventory()
        return inventory['_meta']['hostvars'].get(hostname, {})
    
    def empty_inventory(self) -> Dict[str, Any]:
        """返回空的主机清单"""
        return {'_meta': {'hostvars': {}}}

if __name__ == '__main__':
    DynamicInventory()

7.3.2 AWS EC2 动态 Inventory

#!/usr/bin/env python3
# inventory/aws_ec2_inventory.py

import boto3
import json
import argparse
from typing import Dict, List, Any

class EC2DynamicInventory:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.inventory = {
            '_meta': {
                'hostvars': {}
            }
        }
        self.read_cli_args()
        
        if self.args.list:
            self.inventory = self.get_inventory()
        elif self.args.host:
            self.inventory = self.get_host_vars(self.args.host)
        
        print(json.dumps(self.inventory, indent=2))
    
    def read_cli_args(self):
        parser = argparse.ArgumentParser()
        parser.add_argument('--list', action='store_true')
        parser.add_argument('--host', action='store')
        self.args = parser.parse_args()
    
    def get_inventory(self) -> Dict[str, Any]:
        """从 AWS EC2 获取主机清单"""
        inventory = {
            '_meta': {
                'hostvars': {}
            }
        }
        
        # 获取所有运行中的 EC2 实例
        response = self.ec2.describe_instances(
            Filters=[
                {
                    'Name': 'instance-state-name',
                    'Values': ['running']
                }
            ]
        )
        
        for reservation in response['Reservations']:
            for instance in reservation['Instances']:
                hostname = self.get_hostname(instance)
                if not hostname:
                    continue
                
                # 按标签分组
                self.group_by_tags(inventory, instance, hostname)
                
                # 按实例类型分组
                instance_type = instance['InstanceType']
                self.add_to_group(inventory, f"type_{instance_type}", hostname)
                
                # 按可用区分组
                az = instance['Placement']['AvailabilityZone']
                self.add_to_group(inventory, f"az_{az}", hostname)
                
                # 按 VPC 分组
                vpc_id = instance.get('VpcId', 'no-vpc')
                self.add_to_group(inventory, f"vpc_{vpc_id}", hostname)
                
                # 设置主机变量
                inventory['_meta']['hostvars'][hostname] = {
                    'ansible_host': instance.get('PublicIpAddress', instance.get('PrivateIpAddress')),
                    'ansible_user': self.get_ssh_user(instance),
                    'ec2_instance_id': instance['InstanceId'],
                    'ec2_instance_type': instance['InstanceType'],
                    'ec2_availability_zone': az,
                    'ec2_region': instance['Placement']['AvailabilityZone'][:-1],
                    'ec2_vpc_id': vpc_id,
                    'ec2_subnet_id': instance.get('SubnetId'),
                    'ec2_private_ip': instance.get('PrivateIpAddress'),
                    'ec2_public_ip': instance.get('PublicIpAddress'),
                    'ec2_state': instance['State']['Name'],
                    'ec2_tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
                }
        
        return inventory
    
    def get_hostname(self, instance: Dict[str, Any]) -> str:
        """获取实例的主机名"""
        # 优先使用 Name 标签
        for tag in instance.get('Tags', []):
            if tag['Key'] == 'Name':
                return tag['Value']
        
        # 使用实例 ID 作为备用
        return instance['InstanceId']
    
    def get_ssh_user(self, instance: Dict[str, Any]) -> str:
        """根据 AMI 确定 SSH 用户"""
        # 获取 AMI 信息
        try:
            ami_response = self.ec2.describe_images(ImageIds=[instance['ImageId']])
            if ami_response['Images']:
                ami_name = ami_response['Images'][0].get('Name', '').lower()
                if 'ubuntu' in ami_name:
                    return 'ubuntu'
                elif 'centos' in ami_name:
                    return 'centos'
                elif 'rhel' in ami_name or 'redhat' in ami_name:
                    return 'ec2-user'
                elif 'amazon' in ami_name:
                    return 'ec2-user'
        except Exception:
            pass
        
        return 'ec2-user'  # 默认用户
    
    def group_by_tags(self, inventory: Dict[str, Any], instance: Dict[str, Any], hostname: str):
        """根据标签分组"""
        for tag in instance.get('Tags', []):
            key = tag['Key']
            value = tag['Value']
            
            # 跳过 Name 标签
            if key == 'Name':
                continue
            
            # 创建标签组
            group_name = f"tag_{key}_{value}".replace(' ', '_').replace('-', '_').lower()
            self.add_to_group(inventory, group_name, hostname)
            
            # 创建标签键组
            key_group = f"tag_{key}".replace(' ', '_').replace('-', '_').lower()
            self.add_to_group(inventory, key_group, hostname)
    
    def add_to_group(self, inventory: Dict[str, Any], group_name: str, hostname: str):
        """将主机添加到组"""
        if group_name not in inventory:
            inventory[group_name] = {
                'hosts': [],
                'vars': {}
            }
        
        if hostname not in inventory[group_name]['hosts']:
            inventory[group_name]['hosts'].append(hostname)
    
    def get_host_vars(self, hostname: str) -> Dict[str, Any]:
        """获取特定主机的变量"""
        inventory = self.get_inventory()
        return inventory['_meta']['hostvars'].get(hostname, {})

if __name__ == '__main__':
    EC2DynamicInventory()

7.3.3 使用 Inventory 插件

# inventory/aws_ec2.yml
# AWS EC2 插件配置
plugin: amazon.aws.aws_ec2
regions:
  - us-east-1
  - us-west-2
filters:
  instance-state-name: running
  tag:Environment: [production, staging]

# 分组配置
keyed_groups:
  # 按环境分组
  - key: tags.Environment
    prefix: env
  # 按实例类型分组
  - key: instance_type
    prefix: type
  # 按可用区分组
  - key: placement.availability_zone
    prefix: az
  # 按 VPC 分组
  - key: vpc_id
    prefix: vpc

# 主机名配置
hostnames:
  - tag:Name
  - instance-id

# 变量组合
compose:
  ansible_host: public_ip_address | default(private_ip_address)
  ansible_user: >
    'ubuntu' if (image.name | default('')) | regex_search('ubuntu', ignorecase=True)
    else 'centos' if (image.name | default('')) | regex_search('centos', ignorecase=True)
    else 'ec2-user'
  ec2_instance_type: instance_type
  ec2_region: placement.region
  ec2_availability_zone: placement.availability_zone
  ec2_vpc_id: vpc_id
  ec2_subnet_id: subnet_id
  ec2_private_ip: private_ip_address
  ec2_public_ip: public_ip_address
  ec2_state: state.name

# 缓存配置
cache: true
cache_plugin: memory
cache_timeout: 3600
cache_connection: /tmp/ansible-inventory-cache
# inventory/azure_rm.yml
# Azure 插件配置
plugin: azure.azcollection.azure_rm
auth_source: auto
include_vm_resource_groups:
  - production-rg
  - staging-rg

# 分组配置
keyed_groups:
  # 按资源组分组
  - key: resource_group
    prefix: rg
  # 按位置分组
  - key: location
    prefix: location
  # 按 VM 大小分组
  - key: vm_size
    prefix: size
  # 按标签分组
  - key: tags.Environment
    prefix: env
  - key: tags.Role
    prefix: role

# 主机名配置
hostnames:
  - name
  - public_ipv4_addresses
  - private_ipv4_addresses

# 变量组合
compose:
  ansible_host: public_ipv4_addresses[0] | default(private_ipv4_addresses[0])
  ansible_user: >
    'ubuntu' if (image.offer | default('')) | regex_search('ubuntu', ignorecase=True)
    else 'centos' if (image.offer | default('')) | regex_search('centos', ignorecase=True)
    else 'azureuser'
  azure_vm_size: vm_size
  azure_location: location
  azure_resource_group: resource_group
  azure_vm_id: vm_id
  azure_power_state: power_state

7.4 Inventory 变量管理

7.4.1 变量优先级

# 变量优先级(从低到高):
# 1. all 组变量
# 2. 父组变量
# 3. 子组变量
# 4. 主机变量
# 5. 主机事实(facts)
# 6. 注册变量
# 7. set_fact 变量
# 8. 角色变量
# 9. 任务变量
# 10. 额外变量(-e 参数)

# inventory/group_vars/all.yml
# 全局变量(最低优先级)
common_packages:
  - curl
  - wget
  - vim
  - htop

ntp_servers:
  - 0.pool.ntp.org
  - 1.pool.ntp.org

log_retention_days: 30
backup_retention_days: 7

# 安全配置
ssh_port: 22
ssh_permit_root_login: false
ssh_password_authentication: false
# inventory/group_vars/production.yml
# 生产环境变量
environment: production
log_level: warning
monitoring_enabled: true
backup_enabled: true

# 生产环境特定配置
max_connections: 1000
worker_processes: auto
cache_ttl: 3600

# 安全配置
firewall_enabled: true
ssl_required: true
fail2ban_enabled: true

# 监控配置
prometheus_scrape_interval: 15s
alert_manager_enabled: true
log_shipping_enabled: true
# inventory/group_vars/webservers.yml
# Web 服务器组变量
http_port: 80
https_port: 443
nginx_worker_processes: auto
nginx_worker_connections: 1024

# SSL 配置
ssl_certificate_path: /etc/ssl/certs
ssl_private_key_path: /etc/ssl/private
ssl_protocols:
  - TLSv1.2
  - TLSv1.3

# 缓存配置
nginx_proxy_cache_path: /var/cache/nginx
nginx_proxy_cache_levels: "1:2"
nginx_proxy_cache_max_size: 1g

# 日志配置
nginx_access_log: /var/log/nginx/access.log
nginx_error_log: /var/log/nginx/error.log
log_format: combined
# inventory/host_vars/web1.example.com.yml
# 主机特定变量(高优先级)
server_role: primary
server_weight: 100

# 主机特定配置
nginx_worker_processes: 4
max_upload_size: 100M

# 备份配置
backup_schedule: "0 2 * * *"
backup_retention_days: 14

# 监控配置
monitoring_checks:
  - http_response
  - ssl_certificate
  - disk_usage
  - memory_usage

# 主机特定的服务配置
services:
  nginx:
    enabled: true
    state: started
  php-fpm:
    enabled: true
    state: started
  redis:
    enabled: false
    state: stopped

7.4.2 变量文件组织

# 推荐的变量文件结构
inventory/
├── hosts.yml                    # 主 Inventory 文件
├── group_vars/
│   ├── all.yml                  # 全局变量
│   ├── all/
│   │   ├── common.yml           # 通用配置
│   │   ├── security.yml         # 安全配置
│   │   └── monitoring.yml       # 监控配置
│   ├── production.yml           # 生产环境变量
│   ├── staging.yml              # 测试环境变量
│   ├── webservers.yml           # Web 服务器组变量
│   ├── webservers/
│   │   ├── nginx.yml            # Nginx 配置
│   │   ├── ssl.yml              # SSL 配置
│   │   └── performance.yml      # 性能配置
│   ├── databases.yml            # 数据库组变量
│   ├── databases/
│   │   ├── mysql.yml            # MySQL 配置
│   │   ├── backup.yml           # 备份配置
│   │   └── replication.yml      # 复制配置
│   └── loadbalancers.yml        # 负载均衡器组变量
├── host_vars/
│   ├── web1.example.com.yml     # 主机特定变量
│   ├── web2.example.com.yml
│   ├── db1.example.com.yml
│   └── lb1.example.com.yml
└── vault/
    ├── production.yml           # 生产环境密钥
    ├── staging.yml              # 测试环境密钥
    └── common.yml               # 通用密钥
# inventory/group_vars/all/common.yml
# 通用配置变量
timezone: UTC
locale: en_US.UTF-8

# 包管理
package_update_cache: true
package_upgrade: false

# 用户管理
admin_users:
  - name: admin
    shell: /bin/bash
    groups: [sudo, wheel]
    ssh_keys:
      - "ssh-rsa AAAAB3NzaC1yc2E... admin@example.com"

# 网络配置
dns_servers:
  - 8.8.8.8
  - 8.8.4.4
  - 1.1.1.1

# 日志配置
log_rotation:
  frequency: daily
  retention: 30
  compress: true

# 临时目录
temp_dir: /tmp
work_dir: /opt/ansible-work
# inventory/group_vars/all/security.yml
# 安全配置变量
# SSH 安全配置
ssh_config:
  port: 22
  permit_root_login: false
  password_authentication: false
  pubkey_authentication: true
  challenge_response_authentication: false
  use_pam: true
  x11_forwarding: false
  max_auth_tries: 3
  client_alive_interval: 300
  client_alive_count_max: 2

# 防火墙配置
firewall:
  enabled: true
  default_policy: deny
  allowed_ports:
    - 22/tcp    # SSH
    - 80/tcp    # HTTP
    - 443/tcp   # HTTPS

# 系统安全
system_security:
  disable_unused_services: true
  enable_selinux: true
  kernel_hardening: true
  file_permissions_check: true

# 密码策略
password_policy:
  min_length: 12
  require_uppercase: true
  require_lowercase: true
  require_numbers: true
  require_special_chars: true
  max_age: 90
  min_age: 1
  history: 12

7.5 Inventory 高级功能

7.5.1 主机模式和过滤

# 基本主机选择
ansible all -m ping                          # 所有主机
ansible webservers -m ping                   # webservers 组
ansible web1.example.com -m ping             # 特定主机

# 模式匹配
ansible 'web*' -m ping                       # 通配符匹配
ansible 'web[1-3]' -m ping                   # 范围匹配
ansible '~web\d+' -m ping                    # 正则表达式匹配

# 组合模式
ansible 'webservers:databases' -m ping       # 并集(OR)
ansible 'webservers:&production' -m ping     # 交集(AND)
ansible 'webservers:!staging' -m ping        # 差集(NOT)

# 复杂模式
ansible 'webservers:&production:!web3*' -m ping  # 生产环境的 web 服务器,排除 web3*
ansible '(webservers:databases):&production' -m ping  # 生产环境的 web 或数据库服务器

# 基于变量过滤
ansible all -m ping --limit 'ansible_os_family=="RedHat"'  # RedHat 系列系统
ansible all -m ping --limit 'environment=="production"'     # 生产环境主机

7.5.2 Inventory 脚本高级示例

#!/usr/bin/env python3
# inventory/advanced_inventory.py

import json
import argparse
import yaml
import os
import subprocess
from typing import Dict, List, Any, Optional
from pathlib import Path

class AdvancedInventory:
    def __init__(self):
        self.config_file = os.environ.get('INVENTORY_CONFIG', 'inventory_config.yml')
        self.cache_dir = os.environ.get('INVENTORY_CACHE_DIR', '/tmp/ansible-inventory-cache')
        self.cache_timeout = int(os.environ.get('INVENTORY_CACHE_TIMEOUT', '3600'))
        
        self.load_config()
        self.read_cli_args()
        
        if self.args.list:
            self.inventory = self.get_inventory()
        elif self.args.host:
            self.inventory = self.get_host_vars(self.args.host)
        else:
            self.inventory = self.empty_inventory()
        
        print(json.dumps(self.inventory, indent=2))
    
    def load_config(self):
        """加载配置文件"""
        try:
            with open(self.config_file, 'r') as f:
                self.config = yaml.safe_load(f)
        except FileNotFoundError:
            self.config = self.get_default_config()
    
    def get_default_config(self) -> Dict[str, Any]:
        """默认配置"""
        return {
            'sources': [
                {
                    'type': 'static',
                    'file': 'static_hosts.yml'
                },
                {
                    'type': 'aws_ec2',
                    'regions': ['us-east-1', 'us-west-2'],
                    'filters': {
                        'instance-state-name': 'running'
                    }
                },
                {
                    'type': 'consul',
                    'url': 'http://consul.example.com:8500',
                    'datacenter': 'dc1'
                }
            ],
            'grouping': {
                'by_environment': True,
                'by_role': True,
                'by_region': True
            },
            'variables': {
                'global': {
                    'ansible_user': 'ubuntu',
                    'ansible_ssh_private_key_file': '~/.ssh/id_rsa'
                }
            }
        }
    
    def read_cli_args(self):
        parser = argparse.ArgumentParser()
        parser.add_argument('--list', action='store_true')
        parser.add_argument('--host', action='store')
        parser.add_argument('--refresh-cache', action='store_true')
        self.args = parser.parse_args()
    
    def get_inventory(self) -> Dict[str, Any]:
        """获取完整的主机清单"""
        # 检查缓存
        if not self.args.refresh_cache:
            cached_inventory = self.load_from_cache()
            if cached_inventory:
                return cached_inventory
        
        inventory = {
            '_meta': {
                'hostvars': {}
            }
        }
        
        # 从多个源收集主机信息
        all_hosts = []
        for source in self.config.get('sources', []):
            hosts = self.get_hosts_from_source(source)
            all_hosts.extend(hosts)
        
        # 处理主机信息
        for host_info in all_hosts:
            hostname = host_info['hostname']
            
            # 应用分组规则
            self.apply_grouping_rules(inventory, host_info)
            
            # 设置主机变量
            host_vars = self.build_host_vars(host_info)
            inventory['_meta']['hostvars'][hostname] = host_vars
        
        # 应用全局变量
        self.apply_global_variables(inventory)
        
        # 缓存结果
        self.save_to_cache(inventory)
        
        return inventory
    
    def get_hosts_from_source(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
        """从指定源获取主机信息"""
        source_type = source['type']
        
        if source_type == 'static':
            return self.get_static_hosts(source)
        elif source_type == 'aws_ec2':
            return self.get_aws_ec2_hosts(source)
        elif source_type == 'consul':
            return self.get_consul_hosts(source)
        elif source_type == 'script':
            return self.get_script_hosts(source)
        else:
            return []
    
    def get_static_hosts(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
        """从静态文件获取主机"""
        try:
            with open(source['file'], 'r') as f:
                data = yaml.safe_load(f)
            
            hosts = []
            for hostname, host_data in data.get('hosts', {}).items():
                host_info = {
                    'hostname': hostname,
                    'source': 'static',
                    **host_data
                }
                hosts.append(host_info)
            
            return hosts
        except FileNotFoundError:
            return []
    
    def get_aws_ec2_hosts(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
        """从 AWS EC2 获取主机"""
        try:
            import boto3
            
            hosts = []
            for region in source.get('regions', ['us-east-1']):
                ec2 = boto3.client('ec2', region_name=region)
                
                filters = []
                for key, value in source.get('filters', {}).items():
                    if isinstance(value, list):
                        filters.append({'Name': key, 'Values': value})
                    else:
                        filters.append({'Name': key, 'Values': [value]})
                
                response = ec2.describe_instances(Filters=filters)
                
                for reservation in response['Reservations']:
                    for instance in reservation['Instances']:
                        hostname = self.get_ec2_hostname(instance)
                        if hostname:
                            host_info = {
                                'hostname': hostname,
                                'source': 'aws_ec2',
                                'ip_address': instance.get('PublicIpAddress', instance.get('PrivateIpAddress')),
                                'private_ip': instance.get('PrivateIpAddress'),
                                'public_ip': instance.get('PublicIpAddress'),
                                'instance_id': instance['InstanceId'],
                                'instance_type': instance['InstanceType'],
                                'region': region,
                                'availability_zone': instance['Placement']['AvailabilityZone'],
                                'vpc_id': instance.get('VpcId'),
                                'subnet_id': instance.get('SubnetId'),
                                'tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
                            }
                            hosts.append(host_info)
            
            return hosts
        except ImportError:
            print("Warning: boto3 not installed, skipping AWS EC2 source")
            return []
        except Exception as e:
            print(f"Warning: Failed to get AWS EC2 hosts: {e}")
            return []
    
    def get_consul_hosts(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
        """从 Consul 获取主机"""
        try:
            import requests
            
            url = source['url']
            datacenter = source.get('datacenter', 'dc1')
            
            # 获取所有服务
            services_url = f"{url}/v1/catalog/services?dc={datacenter}"
            response = requests.get(services_url, timeout=30)
            response.raise_for_status()
            services = response.json()
            
            hosts = []
            for service_name in services.keys():
                # 获取服务实例
                service_url = f"{url}/v1/catalog/service/{service_name}?dc={datacenter}"
                response = requests.get(service_url, timeout=30)
                response.raise_for_status()
                instances = response.json()
                
                for instance in instances:
                    hostname = instance['Node']
                    host_info = {
                        'hostname': hostname,
                        'source': 'consul',
                        'ip_address': instance['Address'],
                        'service_name': service_name,
                        'service_port': instance['ServicePort'],
                        'datacenter': datacenter,
                        'tags': instance.get('ServiceTags', [])
                    }
                    hosts.append(host_info)
            
            return hosts
        except ImportError:
            print("Warning: requests not installed, skipping Consul source")
            return []
        except Exception as e:
            print(f"Warning: Failed to get Consul hosts: {e}")
            return []
    
    def get_script_hosts(self, source: Dict[str, Any]) -> List[Dict[str, Any]]:
        """从脚本获取主机"""
        try:
            script_path = source['script']
            result = subprocess.run(
                [script_path, '--list'],
                capture_output=True,
                text=True,
                timeout=60
            )
            
            if result.returncode == 0:
                data = json.loads(result.stdout)
                hosts = []
                
                for hostname, host_vars in data.get('_meta', {}).get('hostvars', {}).items():
                    host_info = {
                        'hostname': hostname,
                        'source': 'script',
                        **host_vars
                    }
                    hosts.append(host_info)
                
                return hosts
            else:
                print(f"Warning: Script {script_path} failed: {result.stderr}")
                return []
        except Exception as e:
            print(f"Warning: Failed to run script {source['script']}: {e}")
            return []
    
    def get_ec2_hostname(self, instance: Dict[str, Any]) -> Optional[str]:
        """获取 EC2 实例的主机名"""
        # 优先使用 Name 标签
        for tag in instance.get('Tags', []):
            if tag['Key'] == 'Name':
                return tag['Value']
        
        # 使用实例 ID 作为备用
        return instance['InstanceId']
    
    def apply_grouping_rules(self, inventory: Dict[str, Any], host_info: Dict[str, Any]):
        """应用分组规则"""
        hostname = host_info['hostname']
        grouping_config = self.config.get('grouping', {})
        
        # 按环境分组
        if grouping_config.get('by_environment'):
            environment = host_info.get('environment') or host_info.get('tags', {}).get('Environment')
            if environment:
                self.add_to_group(inventory, f"env_{environment}", hostname)
        
        # 按角色分组
        if grouping_config.get('by_role'):
            role = host_info.get('role') or host_info.get('tags', {}).get('Role')
            if role:
                self.add_to_group(inventory, f"role_{role}", hostname)
        
        # 按区域分组
        if grouping_config.get('by_region'):
            region = host_info.get('region') or host_info.get('availability_zone', '')[:-1]
            if region:
                self.add_to_group(inventory, f"region_{region}", hostname)
        
        # 按源分组
        source = host_info.get('source')
        if source:
            self.add_to_group(inventory, f"source_{source}", hostname)
    
    def add_to_group(self, inventory: Dict[str, Any], group_name: str, hostname: str):
        """将主机添加到组"""
        if group_name not in inventory:
            inventory[group_name] = {
                'hosts': [],
                'vars': {}
            }
        
        if hostname not in inventory[group_name]['hosts']:
            inventory[group_name]['hosts'].append(hostname)
    
    def build_host_vars(self, host_info: Dict[str, Any]) -> Dict[str, Any]:
        """构建主机变量"""
        host_vars = {
            'ansible_host': host_info.get('ip_address'),
            'inventory_source': host_info.get('source')
        }
        
        # 添加源特定的变量
        source = host_info.get('source')
        if source == 'aws_ec2':
            host_vars.update({
                'ec2_instance_id': host_info.get('instance_id'),
                'ec2_instance_type': host_info.get('instance_type'),
                'ec2_region': host_info.get('region'),
                'ec2_availability_zone': host_info.get('availability_zone'),
                'ec2_vpc_id': host_info.get('vpc_id'),
                'ec2_subnet_id': host_info.get('subnet_id'),
                'ec2_private_ip': host_info.get('private_ip'),
                'ec2_public_ip': host_info.get('public_ip'),
                'ec2_tags': host_info.get('tags', {})
            })
        elif source == 'consul':
            host_vars.update({
                'consul_service_name': host_info.get('service_name'),
                'consul_service_port': host_info.get('service_port'),
                'consul_datacenter': host_info.get('datacenter'),
                'consul_tags': host_info.get('tags', [])
            })
        
        # 添加其他主机信息
        for key, value in host_info.items():
            if key not in ['hostname', 'source', 'ip_address']:
                host_vars[key] = value
        
        return host_vars
    
    def apply_global_variables(self, inventory: Dict[str, Any]):
        """应用全局变量"""
        global_vars = self.config.get('variables', {}).get('global', {})
        
        # 为所有主机添加全局变量
        for hostname in inventory['_meta']['hostvars']:
            for key, value in global_vars.items():
                if key not in inventory['_meta']['hostvars'][hostname]:
                    inventory['_meta']['hostvars'][hostname][key] = value
    
    def load_from_cache(self) -> Optional[Dict[str, Any]]:
        """从缓存加载"""
        cache_file = Path(self.cache_dir) / 'inventory.json'
        
        if cache_file.exists():
            cache_age = cache_file.stat().st_mtime
            if (cache_age + self.cache_timeout) > time.time():
                try:
                    with open(cache_file, 'r') as f:
                        return json.load(f)
                except Exception:
                    pass
        
        return None
    
    def save_to_cache(self, inventory: Dict[str, Any]):
        """保存到缓存"""
        cache_dir = Path(self.cache_dir)
        cache_dir.mkdir(parents=True, exist_ok=True)
        
        cache_file = cache_dir / 'inventory.json'
        try:
            with open(cache_file, 'w') as f:
                json.dump(inventory, f, indent=2)
        except Exception as e:
            print(f"Warning: Failed to save cache: {e}")
    
    def get_host_vars(self, hostname: str) -> Dict[str, Any]:
        """获取特定主机的变量"""
        inventory = self.get_inventory()
        return inventory['_meta']['hostvars'].get(hostname, {})
    
    def empty_inventory(self) -> Dict[str, Any]:
        """返回空的主机清单"""
        return {'_meta': {'hostvars': {}}}

if __name__ == '__main__':
    import time
    AdvancedInventory()

7.6 本章总结

本章详细介绍了 Ansible Inventory 管理的各个方面:

  • Inventory 概述:理解 Inventory 的作用和类型
  • 静态 Inventory:INI 和 YAML 格式的静态主机清单
  • 动态 Inventory:脚本和插件实现的动态主机发现
  • 变量管理:变量优先级和文件组织结构
  • 高级功能:主机过滤、模式匹配和复杂的 Inventory 脚本

掌握 Inventory 管理是有效使用 Ansible 的基础,它决定了自动化任务的执行范围和配置管理的灵活性。

7.7 练习题

基础练习

  1. 静态 Inventory 创建

    • 创建包含多个环境的 INI 格式 Inventory
    • 转换为 YAML 格式并添加主机变量
    • 设置主机连接参数和组变量
  2. 变量管理

    • 设计合理的变量文件结构
    • 实现变量的层次化管理
    • 测试变量优先级

进阶练习

  1. 动态 Inventory

    • 编写自定义动态 Inventory 脚本
    • 集成云平台 API
    • 实现缓存机制
  2. 主机过滤

    • 练习复杂的主机模式匹配
    • 基于变量进行主机过滤
    • 组合多种过滤条件

实战练习

  1. 多源 Inventory
    • 结合静态和动态 Inventory
    • 实现多云环境的主机管理
    • 设计容错和备用机制

下一章第8章:Roles 角色详解

返回目录Ansible 自动化运维教程