1.1 微服务架构概述

什么是微服务架构

微服务架构是一种将单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,并使用轻量级机制(通常是HTTP资源API)进行通信。这些服务围绕业务功能构建,并且可以由全自动部署机制独立部署。

微服务架构的核心特征

# 微服务架构特征清单
apiVersion: v1
kind: ConfigMap
metadata:
  name: microservices-characteristics
  namespace: architecture
data:
  business-capability: "围绕业务功能组织服务"
  decentralized-governance: "去中心化治理"
  failure-isolation: "故障隔离"
  evolutionary-design: "演进式设计"
  infrastructure-automation: "基础设施自动化"
  design-for-failure: "为失败而设计"

微服务 vs 单体架构

# 架构对比分析脚本
#!/usr/bin/env python3

import json
from typing import Dict, List

class ArchitectureComparison:
    def __init__(self):
        self.monolith_characteristics = {
            "deployment": "单一部署单元",
            "scaling": "整体扩展",
            "technology": "统一技术栈",
            "data": "共享数据库",
            "team": "大型团队",
            "complexity": "初期简单,后期复杂"
        }
        
        self.microservices_characteristics = {
            "deployment": "独立部署",
            "scaling": "按需扩展",
            "technology": "技术多样性",
            "data": "数据库分离",
            "team": "小型自治团队",
            "complexity": "分布式复杂性"
        }
    
    def compare_architectures(self) -> Dict:
        """比较两种架构的特点"""
        comparison = {
            "monolith": self.monolith_characteristics,
            "microservices": self.microservices_characteristics,
            "decision_factors": {
                "team_size": "团队规模和组织结构",
                "system_complexity": "系统复杂度",
                "scalability_requirements": "扩展性需求",
                "deployment_frequency": "部署频率",
                "technology_diversity": "技术多样性需求"
            }
        }
        return comparison
    
    def generate_decision_matrix(self) -> str:
        """生成架构选择决策矩阵"""
        matrix = """
        架构选择决策矩阵:
        
        | 因素 | 单体架构 | 微服务架构 |
        |------|----------|------------|
        | 团队规模 | < 10人 | > 10人 |
        | 系统复杂度 | 中低 | 高 |
        | 部署频率 | 低 | 高 |
        | 扩展需求 | 统一扩展 | 差异化扩展 |
        | 技术栈 | 统一 | 多样化 |
        | 运维成本 | 低 | 高 |
        | 开发速度 | 初期快 | 长期快 |
        """
        return matrix

if __name__ == "__main__":
    comparison = ArchitectureComparison()
    result = comparison.compare_architectures()
    print(json.dumps(result, ensure_ascii=False, indent=2))
    print(comparison.generate_decision_matrix())

1.2 微服务架构的优势与挑战

优势分析

// 微服务优势分析
package main

import (
    "fmt"
    "time"
)

type MicroservicesBenefit struct {
    Name        string
    Description string
    Impact      string
    Examples    []string
}

func main() {
    benefits := []MicroservicesBenefit{
        {
            Name:        "技术多样性",
            Description: "每个服务可以选择最适合的技术栈",
            Impact:      "提高开发效率和系统性能",
            Examples:    []string{"用户服务用Java", "推荐服务用Python", "实时服务用Go"},
        },
        {
            Name:        "独立部署",
            Description: "服务可以独立发布和部署",
            Impact:      "降低部署风险,提高发布频率",
            Examples:    []string{"蓝绿部署", "金丝雀发布", "滚动更新"},
        },
        {
            Name:        "故障隔离",
            Description: "单个服务故障不影响整个系统",
            Impact:      "提高系统可用性和稳定性",
            Examples:    []string{"熔断器", "舱壁模式", "超时控制"},
        },
        {
            Name:        "团队自治",
            Description: "小团队负责端到端的服务开发",
            Impact:      "提高团队效率和责任感",
            Examples:    []string{"DevOps实践", "全栈开发", "业务对齐"},
        },
    }
    
    fmt.Println("微服务架构优势分析:")
    for _, benefit := range benefits {
        fmt.Printf("\n优势: %s\n", benefit.Name)
        fmt.Printf("描述: %s\n", benefit.Description)
        fmt.Printf("影响: %s\n", benefit.Impact)
        fmt.Printf("示例: %v\n", benefit.Examples)
    }
}

挑战与解决方案

# 微服务挑战与解决方案配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: microservices-challenges
  namespace: architecture
data:
  distributed-complexity: |
    挑战: 分布式系统复杂性
    解决方案:
    - 服务网格 (Istio, Linkerd)
    - API网关 (Kong, Zuul)
    - 配置中心 (Consul, Nacos)
    
  data-consistency: |
    挑战: 数据一致性
    解决方案:
    - 事件驱动架构
    - Saga模式
    - 最终一致性
    
  service-communication: |
    挑战: 服务间通信
    解决方案:
    - 同步通信 (HTTP/gRPC)
    - 异步通信 (消息队列)
    - 服务发现
    
  monitoring-observability: |
    挑战: 监控和可观测性
    解决方案:
    - 分布式追踪 (Jaeger, Zipkin)
    - 指标监控 (Prometheus)
    - 日志聚合 (ELK Stack)
    
  testing-complexity: |
    挑战: 测试复杂性
    解决方案:
    - 契约测试
    - 端到端测试
    - 混沌工程

1.3 微服务设计原则

单一职责原则

// 服务职责定义示例
public class ServiceResponsibilityAnalyzer {
    
    public static class UserService {
        // 单一职责: 用户管理
        public User createUser(UserRequest request) {
            // 用户创建逻辑
            return new User();
        }
        
        public User getUserById(String userId) {
            // 用户查询逻辑
            return new User();
        }
        
        public void updateUser(String userId, UserRequest request) {
            // 用户更新逻辑
        }
        
        public void deleteUser(String userId) {
            // 用户删除逻辑
        }
    }
    
    public static class OrderService {
        // 单一职责: 订单管理
        public Order createOrder(OrderRequest request) {
            // 订单创建逻辑
            return new Order();
        }
        
        public Order getOrderById(String orderId) {
            // 订单查询逻辑
            return new Order();
        }
        
        public void updateOrderStatus(String orderId, OrderStatus status) {
            // 订单状态更新逻辑
        }
    }
    
    // 反例: 违反单一职责原则
    public static class MonolithService {
        // 错误: 混合了用户和订单职责
        public User createUser(UserRequest request) { return new User(); }
        public Order createOrder(OrderRequest request) { return new Order(); }
        public void sendEmail(String email, String content) { }
        public void generateReport() { }
    }
}

服务边界设计

# 服务边界设计工具
class ServiceBoundaryDesigner:
    def __init__(self):
        self.domain_model = {
            "user_management": {
                "entities": ["User", "Profile", "Preference"],
                "operations": ["register", "login", "update_profile"],
                "data": ["user_data", "authentication_data"]
            },
            "order_management": {
                "entities": ["Order", "OrderItem", "Payment"],
                "operations": ["create_order", "process_payment", "track_order"],
                "data": ["order_data", "payment_data"]
            },
            "inventory_management": {
                "entities": ["Product", "Stock", "Warehouse"],
                "operations": ["check_stock", "reserve_item", "update_inventory"],
                "data": ["product_data", "inventory_data"]
            }
        }
    
    def analyze_service_boundaries(self):
        """分析服务边界"""
        boundaries = {}
        for domain, details in self.domain_model.items():
            boundaries[domain] = {
                "service_name": domain.replace("_", "-") + "-service",
                "bounded_context": details,
                "interfaces": self._define_interfaces(details),
                "dependencies": self._analyze_dependencies(domain)
            }
        return boundaries
    
    def _define_interfaces(self, details):
        """定义服务接口"""
        return {
            "rest_api": [f"/{op}" for op in details["operations"]],
            "events": [f"{entity.lower()}_changed" for entity in details["entities"]],
            "commands": details["operations"]
        }
    
    def _analyze_dependencies(self, domain):
        """分析服务依赖"""
        dependencies = {
            "user_management": [],
            "order_management": ["user_management", "inventory_management"],
            "inventory_management": []
        }
        return dependencies.get(domain, [])
    
    def generate_service_map(self):
        """生成服务地图"""
        boundaries = self.analyze_service_boundaries()
        service_map = {
            "services": list(boundaries.keys()),
            "relationships": {},
            "communication_patterns": {
                "synchronous": ["REST API", "gRPC"],
                "asynchronous": ["Event Streaming", "Message Queue"]
            }
        }
        
        for service, details in boundaries.items():
            service_map["relationships"][service] = details["dependencies"]
        
        return service_map

# 使用示例
designer = ServiceBoundaryDesigner()
boundaries = designer.analyze_service_boundaries()
service_map = designer.generate_service_map()

print("服务边界分析结果:")
for service, boundary in boundaries.items():
    print(f"\n服务: {boundary['service_name']}")
    print(f"领域: {service}")
    print(f"实体: {boundary['bounded_context']['entities']}")
    print(f"操作: {boundary['bounded_context']['operations']}")
    print(f"依赖: {boundary['dependencies']}")

数据管理原则

-- 微服务数据管理示例

-- 用户服务数据库
CREATE DATABASE user_service_db;
USE user_service_db;

CREATE TABLE users (
    id VARCHAR(36) PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

CREATE TABLE user_profiles (
    user_id VARCHAR(36) PRIMARY KEY,
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    phone VARCHAR(20),
    address TEXT,
    FOREIGN KEY (user_id) REFERENCES users(id)
);

-- 订单服务数据库
CREATE DATABASE order_service_db;
USE order_service_db;

CREATE TABLE orders (
    id VARCHAR(36) PRIMARY KEY,
    user_id VARCHAR(36) NOT NULL, -- 外部引用,不使用外键
    status ENUM('pending', 'confirmed', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
    total_amount DECIMAL(10, 2) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

CREATE TABLE order_items (
    id VARCHAR(36) PRIMARY KEY,
    order_id VARCHAR(36) NOT NULL,
    product_id VARCHAR(36) NOT NULL, -- 外部引用
    quantity INT NOT NULL,
    unit_price DECIMAL(10, 2) NOT NULL,
    FOREIGN KEY (order_id) REFERENCES orders(id)
);

-- 库存服务数据库
CREATE DATABASE inventory_service_db;
USE inventory_service_db;

CREATE TABLE products (
    id VARCHAR(36) PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    price DECIMAL(10, 2) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE inventory (
    product_id VARCHAR(36) PRIMARY KEY,
    available_quantity INT NOT NULL DEFAULT 0,
    reserved_quantity INT NOT NULL DEFAULT 0,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    FOREIGN KEY (product_id) REFERENCES products(id)
);

1.4 微服务架构模式

服务发现模式

// 服务发现实现示例
package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

// ServiceInstance 服务实例
type ServiceInstance struct {
    ID       string            `json:"id"`
    Name     string            `json:"name"`
    Host     string            `json:"host"`
    Port     int               `json:"port"`
    Metadata map[string]string `json:"metadata"`
    Health   string            `json:"health"`
}

// ServiceRegistry 服务注册中心
type ServiceRegistry struct {
    services map[string][]*ServiceInstance
    mutex    sync.RWMutex
}

func NewServiceRegistry() *ServiceRegistry {
    return &ServiceRegistry{
        services: make(map[string][]*ServiceInstance),
    }
}

// Register 注册服务
func (sr *ServiceRegistry) Register(instance *ServiceInstance) error {
    sr.mutex.Lock()
    defer sr.mutex.Unlock()
    
    if sr.services[instance.Name] == nil {
        sr.services[instance.Name] = make([]*ServiceInstance, 0)
    }
    
    sr.services[instance.Name] = append(sr.services[instance.Name], instance)
    log.Printf("服务注册成功: %s [%s:%d]\n", instance.Name, instance.Host, instance.Port)
    return nil
}

// Discover 发现服务
func (sr *ServiceRegistry) Discover(serviceName string) ([]*ServiceInstance, error) {
    sr.mutex.RLock()
    defer sr.mutex.RUnlock()
    
    instances, exists := sr.services[serviceName]
    if !exists {
        return nil, fmt.Errorf("服务 %s 未找到", serviceName)
    }
    
    // 过滤健康的服务实例
    healthyInstances := make([]*ServiceInstance, 0)
    for _, instance := range instances {
        if instance.Health == "UP" {
            healthyInstances = append(healthyInstances, instance)
        }
    }
    
    return healthyInstances, nil
}

// Deregister 注销服务
func (sr *ServiceRegistry) Deregister(serviceName, instanceID string) error {
    sr.mutex.Lock()
    defer sr.mutex.Unlock()
    
    instances, exists := sr.services[serviceName]
    if !exists {
        return fmt.Errorf("服务 %s 未找到", serviceName)
    }
    
    for i, instance := range instances {
        if instance.ID == instanceID {
            sr.services[serviceName] = append(instances[:i], instances[i+1:]...)
            log.Printf("服务注销成功: %s [%s]\n", serviceName, instanceID)
            return nil
        }
    }
    
    return fmt.Errorf("服务实例 %s 未找到", instanceID)
}

// HealthCheck 健康检查
func (sr *ServiceRegistry) HealthCheck(ctx context.Context) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            sr.performHealthCheck()
        }
    }
}

func (sr *ServiceRegistry) performHealthCheck() {
    sr.mutex.Lock()
    defer sr.mutex.Unlock()
    
    for serviceName, instances := range sr.services {
        for _, instance := range instances {
            // 模拟健康检查
            if sr.checkInstanceHealth(instance) {
                instance.Health = "UP"
            } else {
                instance.Health = "DOWN"
                log.Printf("服务实例不健康: %s [%s:%d]\n", 
                    serviceName, instance.Host, instance.Port)
            }
        }
    }
}

func (sr *ServiceRegistry) checkInstanceHealth(instance *ServiceInstance) bool {
    // 实际实现中会发送HTTP请求到健康检查端点
    // 这里简化为随机返回
    return true
}

// LoadBalancer 负载均衡器
type LoadBalancer struct {
    registry *ServiceRegistry
    strategy string // "round_robin", "random", "least_connections"
    counters map[string]int
    mutex    sync.Mutex
}

func NewLoadBalancer(registry *ServiceRegistry, strategy string) *LoadBalancer {
    return &LoadBalancer{
        registry: registry,
        strategy: strategy,
        counters: make(map[string]int),
    }
}

// SelectInstance 选择服务实例
func (lb *LoadBalancer) SelectInstance(serviceName string) (*ServiceInstance, error) {
    instances, err := lb.registry.Discover(serviceName)
    if err != nil {
        return nil, err
    }
    
    if len(instances) == 0 {
        return nil, fmt.Errorf("没有可用的 %s 服务实例", serviceName)
    }
    
    switch lb.strategy {
    case "round_robin":
        return lb.roundRobinSelect(serviceName, instances), nil
    case "random":
        return lb.randomSelect(instances), nil
    default:
        return instances[0], nil
    }
}

func (lb *LoadBalancer) roundRobinSelect(serviceName string, instances []*ServiceInstance) *ServiceInstance {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    
    counter := lb.counters[serviceName]
    instance := instances[counter%len(instances)]
    lb.counters[serviceName] = counter + 1
    
    return instance
}

func (lb *LoadBalancer) randomSelect(instances []*ServiceInstance) *ServiceInstance {
    return instances[time.Now().UnixNano()%int64(len(instances))]
}

func main() {
    // 创建服务注册中心
    registry := NewServiceRegistry()
    
    // 注册服务实例
    userService1 := &ServiceInstance{
        ID:     "user-service-1",
        Name:   "user-service",
        Host:   "192.168.1.10",
        Port:   8080,
        Health: "UP",
        Metadata: map[string]string{
            "version": "1.0.0",
            "region":  "us-west",
        },
    }
    
    userService2 := &ServiceInstance{
        ID:     "user-service-2",
        Name:   "user-service",
        Host:   "192.168.1.11",
        Port:   8080,
        Health: "UP",
        Metadata: map[string]string{
            "version": "1.0.0",
            "region":  "us-west",
        },
    }
    
    registry.Register(userService1)
    registry.Register(userService2)
    
    // 创建负载均衡器
    lb := NewLoadBalancer(registry, "round_robin")
    
    // 模拟服务发现和负载均衡
    for i := 0; i < 5; i++ {
        instance, err := lb.SelectInstance("user-service")
        if err != nil {
            log.Printf("选择服务实例失败: %v\n", err)
            continue
        }
        
        fmt.Printf("选择的服务实例: %s [%s:%d]\n", 
            instance.ID, instance.Host, instance.Port)
    }
    
    // 启动健康检查
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    go registry.HealthCheck(ctx)
    
    // 模拟运行一段时间
    time.Sleep(5 * time.Second)
}

API网关模式

# API网关配置示例
apiVersion: v1
kind: ConfigMap
metadata:
  name: api-gateway-config
  namespace: microservices
data:
  gateway.yaml: |
    server:
      port: 8080
      
    routes:
      - id: user-service
        uri: http://user-service:8080
        predicates:
          - Path=/api/users/**
        filters:
          - StripPrefix=2
          - AddRequestHeader=X-Gateway-Source, api-gateway
          
      - id: order-service
        uri: http://order-service:8080
        predicates:
          - Path=/api/orders/**
        filters:
          - StripPrefix=2
          - RateLimiter=redis
          
      - id: inventory-service
        uri: http://inventory-service:8080
        predicates:
          - Path=/api/inventory/**
        filters:
          - StripPrefix=2
          - CircuitBreaker=inventory-cb
    
    security:
      jwt:
        secret: your-secret-key
        expiration: 3600
      
      rate-limiting:
        redis:
          host: redis-service
          port: 6379
          requests-per-minute: 100
    
    circuit-breaker:
      inventory-cb:
        failure-threshold: 5
        timeout: 10s
        recovery-timeout: 30s
    
    cors:
      allowed-origins: "*"
      allowed-methods: "GET,POST,PUT,DELETE"
      allowed-headers: "*"
    
    logging:
      level: INFO
      access-log: true
      
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api-gateway
  namespace: microservices
spec:
  replicas: 3
  selector:
    matchLabels:
      app: api-gateway
  template:
    metadata:
      labels:
        app: api-gateway
    spec:
      containers:
      - name: api-gateway
        image: api-gateway:latest
        ports:
        - containerPort: 8080
        env:
        - name: CONFIG_PATH
          value: "/config/gateway.yaml"
        volumeMounts:
        - name: config
          mountPath: /config
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
      volumes:
      - name: config
        configMap:
          name: api-gateway-config

1.5 微服务通信模式

同步通信

// REST API 客户端示例
class MicroserviceClient {
    constructor(baseUrl, timeout = 5000) {
        this.baseUrl = baseUrl;
        this.timeout = timeout;
        this.circuitBreaker = new CircuitBreaker();
    }
    
    async get(path, options = {}) {
        return this.request('GET', path, null, options);
    }
    
    async post(path, data, options = {}) {
        return this.request('POST', path, data, options);
    }
    
    async put(path, data, options = {}) {
        return this.request('PUT', path, data, options);
    }
    
    async delete(path, options = {}) {
        return this.request('DELETE', path, null, options);
    }
    
    async request(method, path, data, options) {
        const url = `${this.baseUrl}${path}`;
        
        // 熔断器检查
        if (this.circuitBreaker.isOpen()) {
            throw new Error('Circuit breaker is open');
        }
        
        const config = {
            method,
            headers: {
                'Content-Type': 'application/json',
                'X-Request-ID': this.generateRequestId(),
                ...options.headers
            },
            timeout: this.timeout
        };
        
        if (data) {
            config.body = JSON.stringify(data);
        }
        
        try {
            const response = await fetch(url, config);
            
            if (!response.ok) {
                throw new Error(`HTTP ${response.status}: ${response.statusText}`);
            }
            
            const result = await response.json();
            this.circuitBreaker.recordSuccess();
            return result;
            
        } catch (error) {
            this.circuitBreaker.recordFailure();
            throw error;
        }
    }
    
    generateRequestId() {
        return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
    }
}

// 熔断器实现
class CircuitBreaker {
    constructor(failureThreshold = 5, recoveryTimeout = 60000) {
        this.failureThreshold = failureThreshold;
        this.recoveryTimeout = recoveryTimeout;
        this.failureCount = 0;
        this.lastFailureTime = null;
        this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
    }
    
    isOpen() {
        if (this.state === 'OPEN') {
            if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
                this.state = 'HALF_OPEN';
                return false;
            }
            return true;
        }
        return false;
    }
    
    recordSuccess() {
        this.failureCount = 0;
        this.state = 'CLOSED';
    }
    
    recordFailure() {
        this.failureCount++;
        this.lastFailureTime = Date.now();
        
        if (this.failureCount >= this.failureThreshold) {
            this.state = 'OPEN';
        }
    }
}

// 服务客户端使用示例
class UserServiceClient extends MicroserviceClient {
    constructor(serviceUrl) {
        super(serviceUrl);
    }
    
    async getUser(userId) {
        return this.get(`/users/${userId}`);
    }
    
    async createUser(userData) {
        return this.post('/users', userData);
    }
    
    async updateUser(userId, userData) {
        return this.put(`/users/${userId}`, userData);
    }
    
    async deleteUser(userId) {
        return this.delete(`/users/${userId}`);
    }
}

class OrderServiceClient extends MicroserviceClient {
    constructor(serviceUrl) {
        super(serviceUrl);
    }
    
    async createOrder(orderData) {
        return this.post('/orders', orderData);
    }
    
    async getOrder(orderId) {
        return this.get(`/orders/${orderId}`);
    }
    
    async getOrdersByUser(userId) {
        return this.get(`/orders?userId=${userId}`);
    }
}

// 使用示例
async function example() {
    const userClient = new UserServiceClient('http://user-service:8080');
    const orderClient = new OrderServiceClient('http://order-service:8080');
    
    try {
        // 获取用户信息
        const user = await userClient.getUser('user123');
        console.log('用户信息:', user);
        
        // 获取用户订单
        const orders = await orderClient.getOrdersByUser('user123');
        console.log('用户订单:', orders);
        
    } catch (error) {
        console.error('服务调用失败:', error.message);
    }
}

异步通信

# 事件驱动架构示例
import asyncio
import json
import uuid
from datetime import datetime
from typing import Dict, List, Callable
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod

@dataclass
class Event:
    """事件基类"""
    id: str
    type: str
    source: str
    timestamp: str
    data: Dict
    
    def to_json(self) -> str:
        return json.dumps(asdict(self))
    
    @classmethod
    def from_json(cls, json_str: str) -> 'Event':
        data = json.loads(json_str)
        return cls(**data)

@dataclass
class UserCreatedEvent(Event):
    """用户创建事件"""
    def __init__(self, user_id: str, user_data: Dict):
        super().__init__(
            id=str(uuid.uuid4()),
            type="user.created",
            source="user-service",
            timestamp=datetime.utcnow().isoformat(),
            data={"user_id": user_id, **user_data}
        )

@dataclass
class OrderCreatedEvent(Event):
    """订单创建事件"""
    def __init__(self, order_id: str, user_id: str, order_data: Dict):
        super().__init__(
            id=str(uuid.uuid4()),
            type="order.created",
            source="order-service",
            timestamp=datetime.utcnow().isoformat(),
            data={"order_id": order_id, "user_id": user_id, **order_data}
        )

class EventBus(ABC):
    """事件总线抽象类"""
    
    @abstractmethod
    async def publish(self, event: Event) -> None:
        """发布事件"""
        pass
    
    @abstractmethod
    async def subscribe(self, event_type: str, handler: Callable) -> None:
        """订阅事件"""
        pass

class InMemoryEventBus(EventBus):
    """内存事件总线实现"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = {}
        self.events: List[Event] = []
    
    async def publish(self, event: Event) -> None:
        """发布事件"""
        self.events.append(event)
        print(f"发布事件: {event.type} [{event.id}]")
        
        # 通知订阅者
        if event.type in self.subscribers:
            for handler in self.subscribers[event.type]:
                try:
                    await handler(event)
                except Exception as e:
                    print(f"事件处理失败: {e}")
    
    async def subscribe(self, event_type: str, handler: Callable) -> None:
        """订阅事件"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
        print(f"订阅事件: {event_type}")

class EventHandler:
    """事件处理器基类"""
    
    def __init__(self, service_name: str, event_bus: EventBus):
        self.service_name = service_name
        self.event_bus = event_bus
    
    async def handle_event(self, event: Event) -> None:
        """处理事件"""
        print(f"[{self.service_name}] 处理事件: {event.type} [{event.id}]")

class NotificationService(EventHandler):
    """通知服务"""
    
    def __init__(self, event_bus: EventBus):
        super().__init__("notification-service", event_bus)
        self.setup_subscriptions()
    
    async def setup_subscriptions(self):
        """设置事件订阅"""
        await self.event_bus.subscribe("user.created", self.handle_user_created)
        await self.event_bus.subscribe("order.created", self.handle_order_created)
    
    async def handle_user_created(self, event: Event):
        """处理用户创建事件"""
        await self.handle_event(event)
        user_data = event.data
        await self.send_welcome_email(user_data["user_id"], user_data.get("email"))
    
    async def handle_order_created(self, event: Event):
        """处理订单创建事件"""
        await self.handle_event(event)
        order_data = event.data
        await self.send_order_confirmation(order_data["user_id"], order_data["order_id"])
    
    async def send_welcome_email(self, user_id: str, email: str):
        """发送欢迎邮件"""
        print(f"发送欢迎邮件到 {email} (用户ID: {user_id})")
    
    async def send_order_confirmation(self, user_id: str, order_id: str):
        """发送订单确认"""
        print(f"发送订单确认 {order_id} 给用户 {user_id}")

class AnalyticsService(EventHandler):
    """分析服务"""
    
    def __init__(self, event_bus: EventBus):
        super().__init__("analytics-service", event_bus)
        self.user_count = 0
        self.order_count = 0
        self.setup_subscriptions()
    
    async def setup_subscriptions(self):
        """设置事件订阅"""
        await self.event_bus.subscribe("user.created", self.handle_user_created)
        await self.event_bus.subscribe("order.created", self.handle_order_created)
    
    async def handle_user_created(self, event: Event):
        """处理用户创建事件"""
        await self.handle_event(event)
        self.user_count += 1
        print(f"用户统计更新: 总用户数 {self.user_count}")
    
    async def handle_order_created(self, event: Event):
        """处理订单创建事件"""
        await self.handle_event(event)
        self.order_count += 1
        print(f"订单统计更新: 总订单数 {self.order_count}")

class UserService:
    """用户服务"""
    
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.users = {}
    
    async def create_user(self, user_data: Dict) -> str:
        """创建用户"""
        user_id = str(uuid.uuid4())
        self.users[user_id] = user_data
        
        # 发布用户创建事件
        event = UserCreatedEvent(user_id, user_data)
        await self.event_bus.publish(event)
        
        return user_id

class OrderService:
    """订单服务"""
    
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.orders = {}
    
    async def create_order(self, user_id: str, order_data: Dict) -> str:
        """创建订单"""
        order_id = str(uuid.uuid4())
        order_data["user_id"] = user_id
        self.orders[order_id] = order_data
        
        # 发布订单创建事件
        event = OrderCreatedEvent(order_id, user_id, order_data)
        await self.event_bus.publish(event)
        
        return order_id

# 使用示例
async def main():
    # 创建事件总线
    event_bus = InMemoryEventBus()
    
    # 创建服务
    user_service = UserService(event_bus)
    order_service = OrderService(event_bus)
    
    # 创建事件处理服务
    notification_service = NotificationService(event_bus)
    analytics_service = AnalyticsService(event_bus)
    
    # 模拟业务操作
    print("=== 创建用户 ===")
    user_id = await user_service.create_user({
        "username": "john_doe",
        "email": "john@example.com",
        "name": "John Doe"
    })
    
    await asyncio.sleep(0.1)  # 等待事件处理
    
    print("\n=== 创建订单 ===")
    order_id = await order_service.create_order(user_id, {
        "items": [{"product_id": "prod1", "quantity": 2}],
        "total": 99.99
    })
    
    await asyncio.sleep(0.1)  # 等待事件处理
    
    print("\n=== 再创建一个用户 ===")
    user_id2 = await user_service.create_user({
        "username": "jane_doe",
        "email": "jane@example.com",
        "name": "Jane Doe"
    })
    
    await asyncio.sleep(0.1)  # 等待事件处理

if __name__ == "__main__":
    asyncio.run(main())

1.6 总结

本章介绍了微服务架构的基础概念,包括:

核心概念

  • 微服务定义: 围绕业务功能构建的小型、自治的服务
  • 架构特征: 去中心化治理、故障隔离、演进式设计
  • 设计原则: 单一职责、服务边界、数据独立

关键模式

  • 服务发现: 动态定位和负载均衡
  • API网关: 统一入口和横切关注点
  • 通信模式: 同步REST/gRPC和异步事件驱动

实践要点

  • 边界设计: 基于业务领域划分服务
  • 数据管理: 每个服务独立的数据存储
  • 容错设计: 熔断器、超时、重试机制

下一章预告

下一章将深入探讨微服务的技术栈选择,包括: - 编程语言和框架选择 - 数据库技术选型 - 消息队列和事件流 - 容器化和编排技术

通过本章的学习,您应该对微服务架构有了全面的理解,为后续的技术实现打下坚实基础。