1.1 微服务架构概述
什么是微服务架构
微服务架构是一种将单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,并使用轻量级机制(通常是HTTP资源API)进行通信。这些服务围绕业务功能构建,并且可以由全自动部署机制独立部署。
微服务架构的核心特征
# 微服务架构特征清单
apiVersion: v1
kind: ConfigMap
metadata:
name: microservices-characteristics
namespace: architecture
data:
business-capability: "围绕业务功能组织服务"
decentralized-governance: "去中心化治理"
failure-isolation: "故障隔离"
evolutionary-design: "演进式设计"
infrastructure-automation: "基础设施自动化"
design-for-failure: "为失败而设计"
微服务 vs 单体架构
# 架构对比分析脚本
#!/usr/bin/env python3
import json
from typing import Dict, List
class ArchitectureComparison:
def __init__(self):
self.monolith_characteristics = {
"deployment": "单一部署单元",
"scaling": "整体扩展",
"technology": "统一技术栈",
"data": "共享数据库",
"team": "大型团队",
"complexity": "初期简单,后期复杂"
}
self.microservices_characteristics = {
"deployment": "独立部署",
"scaling": "按需扩展",
"technology": "技术多样性",
"data": "数据库分离",
"team": "小型自治团队",
"complexity": "分布式复杂性"
}
def compare_architectures(self) -> Dict:
"""比较两种架构的特点"""
comparison = {
"monolith": self.monolith_characteristics,
"microservices": self.microservices_characteristics,
"decision_factors": {
"team_size": "团队规模和组织结构",
"system_complexity": "系统复杂度",
"scalability_requirements": "扩展性需求",
"deployment_frequency": "部署频率",
"technology_diversity": "技术多样性需求"
}
}
return comparison
def generate_decision_matrix(self) -> str:
"""生成架构选择决策矩阵"""
matrix = """
架构选择决策矩阵:
| 因素 | 单体架构 | 微服务架构 |
|------|----------|------------|
| 团队规模 | < 10人 | > 10人 |
| 系统复杂度 | 中低 | 高 |
| 部署频率 | 低 | 高 |
| 扩展需求 | 统一扩展 | 差异化扩展 |
| 技术栈 | 统一 | 多样化 |
| 运维成本 | 低 | 高 |
| 开发速度 | 初期快 | 长期快 |
"""
return matrix
if __name__ == "__main__":
comparison = ArchitectureComparison()
result = comparison.compare_architectures()
print(json.dumps(result, ensure_ascii=False, indent=2))
print(comparison.generate_decision_matrix())
1.2 微服务架构的优势与挑战
优势分析
// 微服务优势分析
package main
import (
"fmt"
"time"
)
type MicroservicesBenefit struct {
Name string
Description string
Impact string
Examples []string
}
func main() {
benefits := []MicroservicesBenefit{
{
Name: "技术多样性",
Description: "每个服务可以选择最适合的技术栈",
Impact: "提高开发效率和系统性能",
Examples: []string{"用户服务用Java", "推荐服务用Python", "实时服务用Go"},
},
{
Name: "独立部署",
Description: "服务可以独立发布和部署",
Impact: "降低部署风险,提高发布频率",
Examples: []string{"蓝绿部署", "金丝雀发布", "滚动更新"},
},
{
Name: "故障隔离",
Description: "单个服务故障不影响整个系统",
Impact: "提高系统可用性和稳定性",
Examples: []string{"熔断器", "舱壁模式", "超时控制"},
},
{
Name: "团队自治",
Description: "小团队负责端到端的服务开发",
Impact: "提高团队效率和责任感",
Examples: []string{"DevOps实践", "全栈开发", "业务对齐"},
},
}
fmt.Println("微服务架构优势分析:")
for _, benefit := range benefits {
fmt.Printf("\n优势: %s\n", benefit.Name)
fmt.Printf("描述: %s\n", benefit.Description)
fmt.Printf("影响: %s\n", benefit.Impact)
fmt.Printf("示例: %v\n", benefit.Examples)
}
}
挑战与解决方案
# 微服务挑战与解决方案配置
apiVersion: v1
kind: ConfigMap
metadata:
name: microservices-challenges
namespace: architecture
data:
distributed-complexity: |
挑战: 分布式系统复杂性
解决方案:
- 服务网格 (Istio, Linkerd)
- API网关 (Kong, Zuul)
- 配置中心 (Consul, Nacos)
data-consistency: |
挑战: 数据一致性
解决方案:
- 事件驱动架构
- Saga模式
- 最终一致性
service-communication: |
挑战: 服务间通信
解决方案:
- 同步通信 (HTTP/gRPC)
- 异步通信 (消息队列)
- 服务发现
monitoring-observability: |
挑战: 监控和可观测性
解决方案:
- 分布式追踪 (Jaeger, Zipkin)
- 指标监控 (Prometheus)
- 日志聚合 (ELK Stack)
testing-complexity: |
挑战: 测试复杂性
解决方案:
- 契约测试
- 端到端测试
- 混沌工程
1.3 微服务设计原则
单一职责原则
// 服务职责定义示例
public class ServiceResponsibilityAnalyzer {
public static class UserService {
// 单一职责: 用户管理
public User createUser(UserRequest request) {
// 用户创建逻辑
return new User();
}
public User getUserById(String userId) {
// 用户查询逻辑
return new User();
}
public void updateUser(String userId, UserRequest request) {
// 用户更新逻辑
}
public void deleteUser(String userId) {
// 用户删除逻辑
}
}
public static class OrderService {
// 单一职责: 订单管理
public Order createOrder(OrderRequest request) {
// 订单创建逻辑
return new Order();
}
public Order getOrderById(String orderId) {
// 订单查询逻辑
return new Order();
}
public void updateOrderStatus(String orderId, OrderStatus status) {
// 订单状态更新逻辑
}
}
// 反例: 违反单一职责原则
public static class MonolithService {
// 错误: 混合了用户和订单职责
public User createUser(UserRequest request) { return new User(); }
public Order createOrder(OrderRequest request) { return new Order(); }
public void sendEmail(String email, String content) { }
public void generateReport() { }
}
}
服务边界设计
# 服务边界设计工具
class ServiceBoundaryDesigner:
def __init__(self):
self.domain_model = {
"user_management": {
"entities": ["User", "Profile", "Preference"],
"operations": ["register", "login", "update_profile"],
"data": ["user_data", "authentication_data"]
},
"order_management": {
"entities": ["Order", "OrderItem", "Payment"],
"operations": ["create_order", "process_payment", "track_order"],
"data": ["order_data", "payment_data"]
},
"inventory_management": {
"entities": ["Product", "Stock", "Warehouse"],
"operations": ["check_stock", "reserve_item", "update_inventory"],
"data": ["product_data", "inventory_data"]
}
}
def analyze_service_boundaries(self):
"""分析服务边界"""
boundaries = {}
for domain, details in self.domain_model.items():
boundaries[domain] = {
"service_name": domain.replace("_", "-") + "-service",
"bounded_context": details,
"interfaces": self._define_interfaces(details),
"dependencies": self._analyze_dependencies(domain)
}
return boundaries
def _define_interfaces(self, details):
"""定义服务接口"""
return {
"rest_api": [f"/{op}" for op in details["operations"]],
"events": [f"{entity.lower()}_changed" for entity in details["entities"]],
"commands": details["operations"]
}
def _analyze_dependencies(self, domain):
"""分析服务依赖"""
dependencies = {
"user_management": [],
"order_management": ["user_management", "inventory_management"],
"inventory_management": []
}
return dependencies.get(domain, [])
def generate_service_map(self):
"""生成服务地图"""
boundaries = self.analyze_service_boundaries()
service_map = {
"services": list(boundaries.keys()),
"relationships": {},
"communication_patterns": {
"synchronous": ["REST API", "gRPC"],
"asynchronous": ["Event Streaming", "Message Queue"]
}
}
for service, details in boundaries.items():
service_map["relationships"][service] = details["dependencies"]
return service_map
# 使用示例
designer = ServiceBoundaryDesigner()
boundaries = designer.analyze_service_boundaries()
service_map = designer.generate_service_map()
print("服务边界分析结果:")
for service, boundary in boundaries.items():
print(f"\n服务: {boundary['service_name']}")
print(f"领域: {service}")
print(f"实体: {boundary['bounded_context']['entities']}")
print(f"操作: {boundary['bounded_context']['operations']}")
print(f"依赖: {boundary['dependencies']}")
数据管理原则
-- 微服务数据管理示例
-- 用户服务数据库
CREATE DATABASE user_service_db;
USE user_service_db;
CREATE TABLE users (
id VARCHAR(36) PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE user_profiles (
user_id VARCHAR(36) PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
phone VARCHAR(20),
address TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
);
-- 订单服务数据库
CREATE DATABASE order_service_db;
USE order_service_db;
CREATE TABLE orders (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(36) NOT NULL, -- 外部引用,不使用外键
status ENUM('pending', 'confirmed', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
total_amount DECIMAL(10, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE order_items (
id VARCHAR(36) PRIMARY KEY,
order_id VARCHAR(36) NOT NULL,
product_id VARCHAR(36) NOT NULL, -- 外部引用
quantity INT NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders(id)
);
-- 库存服务数据库
CREATE DATABASE inventory_service_db;
USE inventory_service_db;
CREATE TABLE products (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
price DECIMAL(10, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE inventory (
product_id VARCHAR(36) PRIMARY KEY,
available_quantity INT NOT NULL DEFAULT 0,
reserved_quantity INT NOT NULL DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products(id)
);
1.4 微服务架构模式
服务发现模式
// 服务发现实现示例
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// ServiceInstance 服务实例
type ServiceInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
Metadata map[string]string `json:"metadata"`
Health string `json:"health"`
}
// ServiceRegistry 服务注册中心
type ServiceRegistry struct {
services map[string][]*ServiceInstance
mutex sync.RWMutex
}
func NewServiceRegistry() *ServiceRegistry {
return &ServiceRegistry{
services: make(map[string][]*ServiceInstance),
}
}
// Register 注册服务
func (sr *ServiceRegistry) Register(instance *ServiceInstance) error {
sr.mutex.Lock()
defer sr.mutex.Unlock()
if sr.services[instance.Name] == nil {
sr.services[instance.Name] = make([]*ServiceInstance, 0)
}
sr.services[instance.Name] = append(sr.services[instance.Name], instance)
log.Printf("服务注册成功: %s [%s:%d]\n", instance.Name, instance.Host, instance.Port)
return nil
}
// Discover 发现服务
func (sr *ServiceRegistry) Discover(serviceName string) ([]*ServiceInstance, error) {
sr.mutex.RLock()
defer sr.mutex.RUnlock()
instances, exists := sr.services[serviceName]
if !exists {
return nil, fmt.Errorf("服务 %s 未找到", serviceName)
}
// 过滤健康的服务实例
healthyInstances := make([]*ServiceInstance, 0)
for _, instance := range instances {
if instance.Health == "UP" {
healthyInstances = append(healthyInstances, instance)
}
}
return healthyInstances, nil
}
// Deregister 注销服务
func (sr *ServiceRegistry) Deregister(serviceName, instanceID string) error {
sr.mutex.Lock()
defer sr.mutex.Unlock()
instances, exists := sr.services[serviceName]
if !exists {
return fmt.Errorf("服务 %s 未找到", serviceName)
}
for i, instance := range instances {
if instance.ID == instanceID {
sr.services[serviceName] = append(instances[:i], instances[i+1:]...)
log.Printf("服务注销成功: %s [%s]\n", serviceName, instanceID)
return nil
}
}
return fmt.Errorf("服务实例 %s 未找到", instanceID)
}
// HealthCheck 健康检查
func (sr *ServiceRegistry) HealthCheck(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
sr.performHealthCheck()
}
}
}
func (sr *ServiceRegistry) performHealthCheck() {
sr.mutex.Lock()
defer sr.mutex.Unlock()
for serviceName, instances := range sr.services {
for _, instance := range instances {
// 模拟健康检查
if sr.checkInstanceHealth(instance) {
instance.Health = "UP"
} else {
instance.Health = "DOWN"
log.Printf("服务实例不健康: %s [%s:%d]\n",
serviceName, instance.Host, instance.Port)
}
}
}
}
func (sr *ServiceRegistry) checkInstanceHealth(instance *ServiceInstance) bool {
// 实际实现中会发送HTTP请求到健康检查端点
// 这里简化为随机返回
return true
}
// LoadBalancer 负载均衡器
type LoadBalancer struct {
registry *ServiceRegistry
strategy string // "round_robin", "random", "least_connections"
counters map[string]int
mutex sync.Mutex
}
func NewLoadBalancer(registry *ServiceRegistry, strategy string) *LoadBalancer {
return &LoadBalancer{
registry: registry,
strategy: strategy,
counters: make(map[string]int),
}
}
// SelectInstance 选择服务实例
func (lb *LoadBalancer) SelectInstance(serviceName string) (*ServiceInstance, error) {
instances, err := lb.registry.Discover(serviceName)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, fmt.Errorf("没有可用的 %s 服务实例", serviceName)
}
switch lb.strategy {
case "round_robin":
return lb.roundRobinSelect(serviceName, instances), nil
case "random":
return lb.randomSelect(instances), nil
default:
return instances[0], nil
}
}
func (lb *LoadBalancer) roundRobinSelect(serviceName string, instances []*ServiceInstance) *ServiceInstance {
lb.mutex.Lock()
defer lb.mutex.Unlock()
counter := lb.counters[serviceName]
instance := instances[counter%len(instances)]
lb.counters[serviceName] = counter + 1
return instance
}
func (lb *LoadBalancer) randomSelect(instances []*ServiceInstance) *ServiceInstance {
return instances[time.Now().UnixNano()%int64(len(instances))]
}
func main() {
// 创建服务注册中心
registry := NewServiceRegistry()
// 注册服务实例
userService1 := &ServiceInstance{
ID: "user-service-1",
Name: "user-service",
Host: "192.168.1.10",
Port: 8080,
Health: "UP",
Metadata: map[string]string{
"version": "1.0.0",
"region": "us-west",
},
}
userService2 := &ServiceInstance{
ID: "user-service-2",
Name: "user-service",
Host: "192.168.1.11",
Port: 8080,
Health: "UP",
Metadata: map[string]string{
"version": "1.0.0",
"region": "us-west",
},
}
registry.Register(userService1)
registry.Register(userService2)
// 创建负载均衡器
lb := NewLoadBalancer(registry, "round_robin")
// 模拟服务发现和负载均衡
for i := 0; i < 5; i++ {
instance, err := lb.SelectInstance("user-service")
if err != nil {
log.Printf("选择服务实例失败: %v\n", err)
continue
}
fmt.Printf("选择的服务实例: %s [%s:%d]\n",
instance.ID, instance.Host, instance.Port)
}
// 启动健康检查
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go registry.HealthCheck(ctx)
// 模拟运行一段时间
time.Sleep(5 * time.Second)
}
API网关模式
# API网关配置示例
apiVersion: v1
kind: ConfigMap
metadata:
name: api-gateway-config
namespace: microservices
data:
gateway.yaml: |
server:
port: 8080
routes:
- id: user-service
uri: http://user-service:8080
predicates:
- Path=/api/users/**
filters:
- StripPrefix=2
- AddRequestHeader=X-Gateway-Source, api-gateway
- id: order-service
uri: http://order-service:8080
predicates:
- Path=/api/orders/**
filters:
- StripPrefix=2
- RateLimiter=redis
- id: inventory-service
uri: http://inventory-service:8080
predicates:
- Path=/api/inventory/**
filters:
- StripPrefix=2
- CircuitBreaker=inventory-cb
security:
jwt:
secret: your-secret-key
expiration: 3600
rate-limiting:
redis:
host: redis-service
port: 6379
requests-per-minute: 100
circuit-breaker:
inventory-cb:
failure-threshold: 5
timeout: 10s
recovery-timeout: 30s
cors:
allowed-origins: "*"
allowed-methods: "GET,POST,PUT,DELETE"
allowed-headers: "*"
logging:
level: INFO
access-log: true
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: api-gateway
namespace: microservices
spec:
replicas: 3
selector:
matchLabels:
app: api-gateway
template:
metadata:
labels:
app: api-gateway
spec:
containers:
- name: api-gateway
image: api-gateway:latest
ports:
- containerPort: 8080
env:
- name: CONFIG_PATH
value: "/config/gateway.yaml"
volumeMounts:
- name: config
mountPath: /config
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: config
configMap:
name: api-gateway-config
1.5 微服务通信模式
同步通信
// REST API 客户端示例
class MicroserviceClient {
constructor(baseUrl, timeout = 5000) {
this.baseUrl = baseUrl;
this.timeout = timeout;
this.circuitBreaker = new CircuitBreaker();
}
async get(path, options = {}) {
return this.request('GET', path, null, options);
}
async post(path, data, options = {}) {
return this.request('POST', path, data, options);
}
async put(path, data, options = {}) {
return this.request('PUT', path, data, options);
}
async delete(path, options = {}) {
return this.request('DELETE', path, null, options);
}
async request(method, path, data, options) {
const url = `${this.baseUrl}${path}`;
// 熔断器检查
if (this.circuitBreaker.isOpen()) {
throw new Error('Circuit breaker is open');
}
const config = {
method,
headers: {
'Content-Type': 'application/json',
'X-Request-ID': this.generateRequestId(),
...options.headers
},
timeout: this.timeout
};
if (data) {
config.body = JSON.stringify(data);
}
try {
const response = await fetch(url, config);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const result = await response.json();
this.circuitBreaker.recordSuccess();
return result;
} catch (error) {
this.circuitBreaker.recordFailure();
throw error;
}
}
generateRequestId() {
return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
// 熔断器实现
class CircuitBreaker {
constructor(failureThreshold = 5, recoveryTimeout = 60000) {
this.failureThreshold = failureThreshold;
this.recoveryTimeout = recoveryTimeout;
this.failureCount = 0;
this.lastFailureTime = null;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
}
isOpen() {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
this.state = 'HALF_OPEN';
return false;
}
return true;
}
return false;
}
recordSuccess() {
this.failureCount = 0;
this.state = 'CLOSED';
}
recordFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN';
}
}
}
// 服务客户端使用示例
class UserServiceClient extends MicroserviceClient {
constructor(serviceUrl) {
super(serviceUrl);
}
async getUser(userId) {
return this.get(`/users/${userId}`);
}
async createUser(userData) {
return this.post('/users', userData);
}
async updateUser(userId, userData) {
return this.put(`/users/${userId}`, userData);
}
async deleteUser(userId) {
return this.delete(`/users/${userId}`);
}
}
class OrderServiceClient extends MicroserviceClient {
constructor(serviceUrl) {
super(serviceUrl);
}
async createOrder(orderData) {
return this.post('/orders', orderData);
}
async getOrder(orderId) {
return this.get(`/orders/${orderId}`);
}
async getOrdersByUser(userId) {
return this.get(`/orders?userId=${userId}`);
}
}
// 使用示例
async function example() {
const userClient = new UserServiceClient('http://user-service:8080');
const orderClient = new OrderServiceClient('http://order-service:8080');
try {
// 获取用户信息
const user = await userClient.getUser('user123');
console.log('用户信息:', user);
// 获取用户订单
const orders = await orderClient.getOrdersByUser('user123');
console.log('用户订单:', orders);
} catch (error) {
console.error('服务调用失败:', error.message);
}
}
异步通信
# 事件驱动架构示例
import asyncio
import json
import uuid
from datetime import datetime
from typing import Dict, List, Callable
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod
@dataclass
class Event:
"""事件基类"""
id: str
type: str
source: str
timestamp: str
data: Dict
def to_json(self) -> str:
return json.dumps(asdict(self))
@classmethod
def from_json(cls, json_str: str) -> 'Event':
data = json.loads(json_str)
return cls(**data)
@dataclass
class UserCreatedEvent(Event):
"""用户创建事件"""
def __init__(self, user_id: str, user_data: Dict):
super().__init__(
id=str(uuid.uuid4()),
type="user.created",
source="user-service",
timestamp=datetime.utcnow().isoformat(),
data={"user_id": user_id, **user_data}
)
@dataclass
class OrderCreatedEvent(Event):
"""订单创建事件"""
def __init__(self, order_id: str, user_id: str, order_data: Dict):
super().__init__(
id=str(uuid.uuid4()),
type="order.created",
source="order-service",
timestamp=datetime.utcnow().isoformat(),
data={"order_id": order_id, "user_id": user_id, **order_data}
)
class EventBus(ABC):
"""事件总线抽象类"""
@abstractmethod
async def publish(self, event: Event) -> None:
"""发布事件"""
pass
@abstractmethod
async def subscribe(self, event_type: str, handler: Callable) -> None:
"""订阅事件"""
pass
class InMemoryEventBus(EventBus):
"""内存事件总线实现"""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
self.events: List[Event] = []
async def publish(self, event: Event) -> None:
"""发布事件"""
self.events.append(event)
print(f"发布事件: {event.type} [{event.id}]")
# 通知订阅者
if event.type in self.subscribers:
for handler in self.subscribers[event.type]:
try:
await handler(event)
except Exception as e:
print(f"事件处理失败: {e}")
async def subscribe(self, event_type: str, handler: Callable) -> None:
"""订阅事件"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
print(f"订阅事件: {event_type}")
class EventHandler:
"""事件处理器基类"""
def __init__(self, service_name: str, event_bus: EventBus):
self.service_name = service_name
self.event_bus = event_bus
async def handle_event(self, event: Event) -> None:
"""处理事件"""
print(f"[{self.service_name}] 处理事件: {event.type} [{event.id}]")
class NotificationService(EventHandler):
"""通知服务"""
def __init__(self, event_bus: EventBus):
super().__init__("notification-service", event_bus)
self.setup_subscriptions()
async def setup_subscriptions(self):
"""设置事件订阅"""
await self.event_bus.subscribe("user.created", self.handle_user_created)
await self.event_bus.subscribe("order.created", self.handle_order_created)
async def handle_user_created(self, event: Event):
"""处理用户创建事件"""
await self.handle_event(event)
user_data = event.data
await self.send_welcome_email(user_data["user_id"], user_data.get("email"))
async def handle_order_created(self, event: Event):
"""处理订单创建事件"""
await self.handle_event(event)
order_data = event.data
await self.send_order_confirmation(order_data["user_id"], order_data["order_id"])
async def send_welcome_email(self, user_id: str, email: str):
"""发送欢迎邮件"""
print(f"发送欢迎邮件到 {email} (用户ID: {user_id})")
async def send_order_confirmation(self, user_id: str, order_id: str):
"""发送订单确认"""
print(f"发送订单确认 {order_id} 给用户 {user_id}")
class AnalyticsService(EventHandler):
"""分析服务"""
def __init__(self, event_bus: EventBus):
super().__init__("analytics-service", event_bus)
self.user_count = 0
self.order_count = 0
self.setup_subscriptions()
async def setup_subscriptions(self):
"""设置事件订阅"""
await self.event_bus.subscribe("user.created", self.handle_user_created)
await self.event_bus.subscribe("order.created", self.handle_order_created)
async def handle_user_created(self, event: Event):
"""处理用户创建事件"""
await self.handle_event(event)
self.user_count += 1
print(f"用户统计更新: 总用户数 {self.user_count}")
async def handle_order_created(self, event: Event):
"""处理订单创建事件"""
await self.handle_event(event)
self.order_count += 1
print(f"订单统计更新: 总订单数 {self.order_count}")
class UserService:
"""用户服务"""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.users = {}
async def create_user(self, user_data: Dict) -> str:
"""创建用户"""
user_id = str(uuid.uuid4())
self.users[user_id] = user_data
# 发布用户创建事件
event = UserCreatedEvent(user_id, user_data)
await self.event_bus.publish(event)
return user_id
class OrderService:
"""订单服务"""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.orders = {}
async def create_order(self, user_id: str, order_data: Dict) -> str:
"""创建订单"""
order_id = str(uuid.uuid4())
order_data["user_id"] = user_id
self.orders[order_id] = order_data
# 发布订单创建事件
event = OrderCreatedEvent(order_id, user_id, order_data)
await self.event_bus.publish(event)
return order_id
# 使用示例
async def main():
# 创建事件总线
event_bus = InMemoryEventBus()
# 创建服务
user_service = UserService(event_bus)
order_service = OrderService(event_bus)
# 创建事件处理服务
notification_service = NotificationService(event_bus)
analytics_service = AnalyticsService(event_bus)
# 模拟业务操作
print("=== 创建用户 ===")
user_id = await user_service.create_user({
"username": "john_doe",
"email": "john@example.com",
"name": "John Doe"
})
await asyncio.sleep(0.1) # 等待事件处理
print("\n=== 创建订单 ===")
order_id = await order_service.create_order(user_id, {
"items": [{"product_id": "prod1", "quantity": 2}],
"total": 99.99
})
await asyncio.sleep(0.1) # 等待事件处理
print("\n=== 再创建一个用户 ===")
user_id2 = await user_service.create_user({
"username": "jane_doe",
"email": "jane@example.com",
"name": "Jane Doe"
})
await asyncio.sleep(0.1) # 等待事件处理
if __name__ == "__main__":
asyncio.run(main())
1.6 总结
本章介绍了微服务架构的基础概念,包括:
核心概念
- 微服务定义: 围绕业务功能构建的小型、自治的服务
- 架构特征: 去中心化治理、故障隔离、演进式设计
- 设计原则: 单一职责、服务边界、数据独立
关键模式
- 服务发现: 动态定位和负载均衡
- API网关: 统一入口和横切关注点
- 通信模式: 同步REST/gRPC和异步事件驱动
实践要点
- 边界设计: 基于业务领域划分服务
- 数据管理: 每个服务独立的数据存储
- 容错设计: 熔断器、超时、重试机制
下一章预告
下一章将深入探讨微服务的技术栈选择,包括: - 编程语言和框架选择 - 数据库技术选型 - 消息队列和事件流 - 容器化和编排技术
通过本章的学习,您应该对微服务架构有了全面的理解,为后续的技术实现打下坚实基础。