2.1 编程语言与框架选择

主流编程语言对比

# 编程语言选择分析工具
class LanguageSelector:
    def __init__(self):
        self.languages = {
            "Java": {
                "优势": ["成熟生态", "企业级支持", "强类型", "JVM性能"],
                "劣势": ["启动时间长", "内存占用大", "语法冗长"],
                "适用场景": ["企业级应用", "大型系统", "金融服务"],
                "框架": ["Spring Boot", "Micronaut", "Quarkus"],
                "性能": "高",
                "学习曲线": "中等",
                "社区活跃度": "很高"
            },
            "Go": {
                "优势": ["高性能", "并发支持", "快速编译", "小内存占用"],
                "劣势": ["生态相对较新", "泛型支持有限", "错误处理冗长"],
                "适用场景": ["云原生应用", "API服务", "系统工具"],
                "框架": ["Gin", "Echo", "Fiber", "Go-kit"],
                "性能": "很高",
                "学习曲线": "低",
                "社区活跃度": "高"
            },
            "Node.js": {
                "优势": ["JavaScript统一", "异步IO", "快速开发", "丰富包管理"],
                "劣势": ["单线程限制", "CPU密集型性能差", "回调地狱"],
                "适用场景": ["前端BFF", "实时应用", "快速原型"],
                "框架": ["Express", "Koa", "NestJS", "Fastify"],
                "性能": "中等",
                "学习曲线": "低",
                "社区活跃度": "很高"
            },
            "Python": {
                "优势": ["简洁语法", "丰富库", "AI/ML支持", "快速开发"],
                "劣势": ["性能较低", "GIL限制", "部署复杂"],
                "适用场景": ["数据服务", "AI/ML服务", "脚本工具"],
                "框架": ["FastAPI", "Django", "Flask", "Tornado"],
                "性能": "中低",
                "学习曲线": "很低",
                "社区活跃度": "很高"
            },
            "C#": {
                "优势": ["强类型", "优秀工具", "跨平台", "企业支持"],
                "劣势": ["微软生态绑定", "许可成本", "Linux支持较新"],
                "适用场景": ["企业应用", "Windows环境", "混合云"],
                "框架": [".NET Core", "ASP.NET Core", "Orleans"],
                "性能": "高",
                "学习曲线": "中等",
                "社区活跃度": "高"
            }
        }
    
    def analyze_requirements(self, requirements):
        """根据需求分析推荐语言"""
        scores = {}
        
        for lang, details in self.languages.items():
            score = 0
            
            # 性能要求
            if requirements.get("performance") == "high":
                if details["性能"] in ["高", "很高"]:
                    score += 3
            
            # 开发速度要求
            if requirements.get("development_speed") == "fast":
                if details["学习曲线"] in ["低", "很低"]:
                    score += 2
            
            # 团队经验
            team_experience = requirements.get("team_experience", [])
            if lang.lower() in [exp.lower() for exp in team_experience]:
                score += 3
            
            # 生态系统要求
            if requirements.get("ecosystem") == "mature":
                if details["社区活跃度"] == "很高":
                    score += 2
            
            scores[lang] = score
        
        # 排序推荐
        recommendations = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        return recommendations
    
    def generate_comparison_report(self):
        """生成语言对比报告"""
        report = "# 微服务编程语言对比报告\n\n"
        
        for lang, details in self.languages.items():
            report += f"## {lang}\n\n"
            report += f"**性能**: {details['性能']}\n"
            report += f"**学习曲线**: {details['学习曲线']}\n"
            report += f"**社区活跃度**: {details['社区活跃度']}\n\n"
            
            report += "**优势**:\n"
            for advantage in details["优势"]:
                report += f"- {advantage}\n"
            
            report += "\n**劣势**:\n"
            for disadvantage in details["劣势"]:
                report += f"- {disadvantage}\n"
            
            report += "\n**适用场景**:\n"
            for scenario in details["适用场景"]:
                report += f"- {scenario}\n"
            
            report += "\n**推荐框架**:\n"
            for framework in details["框架"]:
                report += f"- {framework}\n"
            
            report += "\n---\n\n"
        
        return report

# 使用示例
selector = LanguageSelector()

# 分析需求
requirements = {
    "performance": "high",
    "development_speed": "fast",
    "team_experience": ["Java", "Go"],
    "ecosystem": "mature"
}

recommendations = selector.analyze_requirements(requirements)
print("语言推荐排序:")
for lang, score in recommendations:
    print(f"{lang}: {score}分")

# 生成报告
report = selector.generate_comparison_report()
print("\n" + report)

框架选择指南

// Go微服务框架对比
package main

import (
    "encoding/json"
    "fmt"
    "log"
)

type Framework struct {
    Name         string   `json:"name"`
    Type         string   `json:"type"`
    Performance  string   `json:"performance"`
    Features     []string `json:"features"`
    Pros         []string `json:"pros"`
    Cons         []string `json:"cons"`
    UseCase      []string `json:"use_case"`
    Learning     string   `json:"learning_curve"`
    Community    string   `json:"community"`
}

func main() {
    frameworks := []Framework{
        {
            Name:        "Gin",
            Type:        "HTTP Web Framework",
            Performance: "很高",
            Features:    []string{"路由", "中间件", "JSON绑定", "验证"},
            Pros:        []string{"轻量级", "高性能", "简单易用", "丰富中间件"},
            Cons:        []string{"功能相对简单", "缺少高级特性"},
            UseCase:     []string{"REST API", "Web服务", "快速原型"},
            Learning:    "低",
            Community:   "很高",
        },
        {
            Name:        "Echo",
            Type:        "HTTP Web Framework",
            Performance: "高",
            Features:    []string{"路由", "中间件", "数据绑定", "模板"},
            Pros:        []string{"功能丰富", "文档完善", "扩展性好"},
            Cons:        []string{"相对复杂", "学习成本高"},
            UseCase:     []string{"企业应用", "复杂API", "全功能Web"},
            Learning:    "中等",
            Community:   "高",
        },
        {
            Name:        "Go-kit",
            Type:        "Microservice Toolkit",
            Performance: "高",
            Features:    []string{"服务发现", "负载均衡", "熔断器", "限流", "追踪"},
            Pros:        []string{"微服务专用", "功能全面", "可插拔架构"},
            Cons:        []string{"复杂度高", "学习曲线陡峭"},
            UseCase:     []string{"大型微服务", "企业级系统", "云原生应用"},
            Learning:    "高",
            Community:   "中等",
        },
        {
            Name:        "Kratos",
            Type:        "Microservice Framework",
            Performance: "高",
            Features:    []string{"gRPC", "HTTP", "配置管理", "日志", "追踪", "指标"},
            Pros:        []string{"B站开源", "生产验证", "功能完整", "中文文档"},
            Cons:        []string{"相对较新", "社区较小"},
            UseCase:     []string{"企业微服务", "云原生应用", "gRPC服务"},
            Learning:    "中等",
            Community:   "中等",
        },
    }
    
    fmt.Println("Go微服务框架对比分析:")
    for _, fw := range frameworks {
        data, _ := json.MarshalIndent(fw, "", "  ")
        fmt.Printf("\n%s\n", string(data))
    }
    
    // 框架选择建议
    recommendations := map[string]string{
        "快速开发": "Gin - 简单易用,快速上手",
        "功能丰富": "Echo - 功能完整,文档详细",
        "微服务架构": "Go-kit - 专为微服务设计",
        "企业级应用": "Kratos - 生产验证,功能全面",
    }
    
    fmt.Println("\n框架选择建议:")
    for scenario, recommendation := range recommendations {
        fmt.Printf("%s: %s\n", scenario, recommendation)
    }
}

2.2 数据库技术选型

数据库类型对比

# 数据库选型配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: database-selection-guide
  namespace: microservices
data:
  relational-databases: |
    关系型数据库:
    
    MySQL:
      优势: 成熟稳定、社区活跃、性能优秀、易于使用
      劣势: 扩展性有限、复杂查询性能一般
      适用场景: 事务性应用、传统业务系统、中小型应用
      配置示例:
        driver: mysql
        host: mysql-service
        port: 3306
        database: microservice_db
        charset: utf8mb4
        max_connections: 100
    
    PostgreSQL:
      优势: 功能强大、标准兼容、扩展性好、支持JSON
      劣势: 配置复杂、内存占用较大
      适用场景: 复杂查询、地理信息、JSON数据、企业应用
      配置示例:
        driver: postgres
        host: postgres-service
        port: 5432
        database: microservice_db
        sslmode: disable
        max_connections: 50
  
  nosql-databases: |
    NoSQL数据库:
    
    MongoDB:
      优势: 文档存储、灵活schema、水平扩展、丰富查询
      劣势: 内存占用大、事务支持有限
      适用场景: 内容管理、实时分析、物联网、快速开发
      配置示例:
        uri: mongodb://mongo-service:27017
        database: microservice_db
        collection_prefix: ms_
        max_pool_size: 100
    
    Redis:
      优势: 高性能、丰富数据结构、持久化、集群支持
      劣势: 内存限制、数据结构相对简单
      适用场景: 缓存、会话存储、实时计算、消息队列
      配置示例:
        host: redis-service
        port: 6379
        database: 0
        max_connections: 100
        timeout: 5s
    
    Cassandra:
      优势: 高可用、线性扩展、无单点故障、写性能优秀
      劣势: 最终一致性、查询限制、运维复杂
      适用场景: 大数据、时序数据、高写入量、分布式系统
      配置示例:
        hosts: [cassandra-1, cassandra-2, cassandra-3]
        keyspace: microservice_ks
        consistency: quorum
        timeout: 10s
  
  time-series-databases: |
    时序数据库:
    
    InfluxDB:
      优势: 时序优化、高压缩比、SQL-like查询、可视化支持
      劣势: 单机版限制、集群版收费
      适用场景: 监控指标、IoT数据、实时分析
      配置示例:
        url: http://influxdb-service:8086
        database: metrics
        retention_policy: autogen
        precision: s
    
    TimescaleDB:
      优势: PostgreSQL兼容、SQL支持、自动分区、压缩
      劣势: 相对较新、生态系统小
      适用场景: 时序分析、PostgreSQL迁移、复杂查询
      配置示例:
        driver: postgres
        host: timescaledb-service
        port: 5432
        database: timeseries_db
        hypertable_chunk_time_interval: 1d
  
  selection-criteria: |
    选择标准:
    
    1. 数据特征:
       - 结构化 → 关系型数据库
       - 半结构化 → 文档数据库
       - 键值对 → 键值数据库
       - 时序数据 → 时序数据库
    
    2. 一致性要求:
       - 强一致性 → 关系型数据库
       - 最终一致性 → NoSQL数据库
    
    3. 扩展性需求:
       - 垂直扩展 → 关系型数据库
       - 水平扩展 → NoSQL数据库
    
    4. 查询复杂度:
       - 复杂查询 → SQL数据库
       - 简单查询 → NoSQL数据库
    
    5. 性能要求:
       - 读密集 → 读副本、缓存
       - 写密集 → 分片、NoSQL
       - 低延迟 → 内存数据库

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: database-patterns
  namespace: microservices
data:
  database-per-service: |
    每服务一个数据库模式:
    
    优势:
    - 服务独立性
    - 技术多样性
    - 故障隔离
    - 独立扩展
    
    挑战:
    - 数据一致性
    - 跨服务查询
    - 事务管理
    - 数据同步
    
    实施策略:
    - 事件驱动架构
    - Saga模式
    - CQRS模式
    - 数据复制
  
  shared-database: |
    共享数据库模式:
    
    优势:
    - 数据一致性
    - 简单事务
    - 跨服务查询
    - 运维简单
    
    挑战:
    - 服务耦合
    - 扩展困难
    - 技术锁定
    - 故障传播
    
    适用场景:
    - 小型系统
    - 紧密耦合的服务
    - 过渡阶段
  
  cqrs-pattern: |
    CQRS模式:
    
    概念:
    - Command Query Responsibility Segregation
    - 读写分离
    - 不同的数据模型
    
    优势:
    - 读写优化
    - 扩展性好
    - 复杂查询支持
    - 性能提升
    
    实现:
    - 写模型: 事务数据库
    - 读模型: 查询优化数据库
    - 事件同步: 异步更新

数据库连接池配置

// Java数据库连接池配置示例
@Configuration
public class DatabaseConfiguration {
    
    @Value("${spring.datasource.url}")
    private String url;
    
    @Value("${spring.datasource.username}")
    private String username;
    
    @Value("${spring.datasource.password}")
    private String password;
    
    @Bean
    @Primary
    public DataSource primaryDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl(url);
        config.setUsername(username);
        config.setPassword(password);
        
        // 连接池配置
        config.setMaximumPoolSize(20);              // 最大连接数
        config.setMinimumIdle(5);                   // 最小空闲连接数
        config.setConnectionTimeout(30000);         // 连接超时时间(ms)
        config.setIdleTimeout(600000);              // 空闲超时时间(ms)
        config.setMaxLifetime(1800000);             // 连接最大生命周期(ms)
        config.setLeakDetectionThreshold(60000);    // 连接泄漏检测阈值(ms)
        
        // 连接测试
        config.setConnectionTestQuery("SELECT 1");
        config.setValidationTimeout(5000);
        
        // 性能优化
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        config.addDataSourceProperty("useServerPrepStmts", "true");
        config.addDataSourceProperty("useLocalSessionState", "true");
        config.addDataSourceProperty("rewriteBatchedStatements", "true");
        config.addDataSourceProperty("cacheResultSetMetadata", "true");
        config.addDataSourceProperty("cacheServerConfiguration", "true");
        config.addDataSourceProperty("elideSetAutoCommits", "true");
        config.addDataSourceProperty("maintainTimeStats", "false");
        
        return new HikariDataSource(config);
    }
    
    @Bean
    public DataSource readOnlyDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl(url.replace("//master", "//slave"));
        config.setUsername(username);
        config.setPassword(password);
        
        // 只读配置
        config.setReadOnly(true);
        config.setMaximumPoolSize(10);
        config.setMinimumIdle(2);
        
        return new HikariDataSource(config);
    }
    
    @Bean
    public JdbcTemplate jdbcTemplate(@Qualifier("primaryDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }
    
    @Bean
    public JdbcTemplate readOnlyJdbcTemplate(@Qualifier("readOnlyDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }
}

// 数据源路由配置
@Component
public class DatabaseRouter {
    
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
    public static void setDataSourceType(String dataSourceType) {
        contextHolder.set(dataSourceType);
    }
    
    public static String getDataSourceType() {
        return contextHolder.get();
    }
    
    public static void clearDataSourceType() {
        contextHolder.remove();
    }
}

@Component
public class RoutingDataSource extends AbstractRoutingDataSource {
    
    @Override
    protected Object determineCurrentLookupKey() {
        return DatabaseRouter.getDataSourceType();
    }
}

// 读写分离注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}

@Aspect
@Component
public class ReadOnlyAspect {
    
    @Before("@annotation(readOnly)")
    public void setReadOnlyDataSource(ReadOnly readOnly) {
        DatabaseRouter.setDataSourceType("readOnly");
    }
    
    @After("@annotation(readOnly)")
    public void clearDataSource() {
        DatabaseRouter.clearDataSourceType();
    }
}

2.3 消息队列与事件流

消息队列选型

# 消息队列对比分析
class MessageQueueComparison:
    def __init__(self):
        self.queues = {
            "RabbitMQ": {
                "类型": "传统消息队列",
                "协议": ["AMQP", "STOMP", "MQTT"],
                "特性": {
                    "可靠性": "很高",
                    "性能": "中高",
                    "扩展性": "中等",
                    "易用性": "高",
                    "管理界面": "优秀"
                },
                "优势": [
                    "成熟稳定",
                    "功能丰富",
                    "多协议支持",
                    "管理界面友好",
                    "插件生态丰富"
                ],
                "劣势": [
                    "Erlang语言门槛",
                    "集群配置复杂",
                    "性能不是最优"
                ],
                "适用场景": [
                    "企业级应用",
                    "复杂路由需求",
                    "可靠性要求高",
                    "传统架构迁移"
                ],
                "吞吐量": "10K-100K msg/s",
                "延迟": "1-10ms"
            },
            "Apache Kafka": {
                "类型": "分布式流平台",
                "协议": ["Kafka Protocol"],
                "特性": {
                    "可靠性": "高",
                    "性能": "很高",
                    "扩展性": "很高",
                    "易用性": "中等",
                    "管理界面": "一般"
                },
                "优势": [
                    "高吞吐量",
                    "水平扩展",
                    "持久化存储",
                    "流处理支持",
                    "生态系统丰富"
                ],
                "劣势": [
                    "配置复杂",
                    "运维成本高",
                    "实时性一般",
                    "小消息效率低"
                ],
                "适用场景": [
                    "大数据处理",
                    "事件流",
                    "日志收集",
                    "实时分析"
                ],
                "吞吐量": "100K-1M+ msg/s",
                "延迟": "10-100ms"
            },
            "Redis Streams": {
                "类型": "内存流数据结构",
                "协议": ["Redis Protocol"],
                "特性": {
                    "可靠性": "中高",
                    "性能": "很高",
                    "扩展性": "中等",
                    "易用性": "高",
                    "管理界面": "一般"
                },
                "优势": [
                    "超高性能",
                    "简单易用",
                    "内存存储",
                    "Redis生态",
                    "多种数据结构"
                ],
                "劣势": [
                    "内存限制",
                    "持久化依赖配置",
                    "集群复杂"
                ],
                "适用场景": [
                    "高性能场景",
                    "实时处理",
                    "缓存集成",
                    "简单消息队列"
                ],
                "吞吐量": "100K-1M+ msg/s",
                "延迟": "<1ms"
            },
            "NATS": {
                "类型": "云原生消息系统",
                "协议": ["NATS Protocol"],
                "特性": {
                    "可靠性": "高",
                    "性能": "很高",
                    "扩展性": "高",
                    "易用性": "很高",
                    "管理界面": "简洁"
                },
                "优势": [
                    "轻量级",
                    "云原生",
                    "高性能",
                    "简单部署",
                    "多种模式"
                ],
                "劣势": [
                    "生态相对较小",
                    "持久化需要JetStream",
                    "企业特性有限"
                ],
                "适用场景": [
                    "微服务通信",
                    "云原生应用",
                    "IoT场景",
                    "边缘计算"
                ],
                "吞吐量": "1M+ msg/s",
                "延迟": "<1ms"
            }
        }
    
    def compare_by_criteria(self, criteria):
        """根据标准对比消息队列"""
        results = {}
        
        for name, details in self.queues.items():
            score = 0
            
            if criteria.get("performance") == "high":
                if details["特性"]["性能"] in ["很高"]:
                    score += 3
                elif details["特性"]["性能"] == "高":
                    score += 2
            
            if criteria.get("reliability") == "high":
                if details["特性"]["可靠性"] in ["很高", "高"]:
                    score += 2
            
            if criteria.get("scalability") == "high":
                if details["特性"]["扩展性"] in ["很高", "高"]:
                    score += 2
            
            if criteria.get("ease_of_use") == "high":
                if details["特性"]["易用性"] in ["很高", "高"]:
                    score += 1
            
            results[name] = {
                "score": score,
                "details": details
            }
        
        return sorted(results.items(), key=lambda x: x[1]["score"], reverse=True)
    
    def generate_selection_guide(self):
        """生成选择指南"""
        guide = {
            "高吞吐量场景": "Apache Kafka - 专为大数据和高吞吐量设计",
            "低延迟场景": "NATS 或 Redis Streams - 亚毫秒级延迟",
            "企业级应用": "RabbitMQ - 成熟稳定,功能丰富",
            "云原生应用": "NATS - 轻量级,云原生设计",
            "简单场景": "Redis Streams - 易于集成和使用",
            "复杂路由": "RabbitMQ - 支持复杂的消息路由",
            "流处理": "Apache Kafka - 内置流处理能力",
            "实时通信": "NATS - 专为实时通信优化"
        }
        return guide

# 使用示例
comparison = MessageQueueComparison()

# 根据需求对比
criteria = {
    "performance": "high",
    "reliability": "high",
    "scalability": "high",
    "ease_of_use": "high"
}

results = comparison.compare_by_criteria(criteria)
print("消息队列推荐排序:")
for name, result in results:
    print(f"{name}: {result['score']}分")

# 生成选择指南
guide = comparison.generate_selection_guide()
print("\n选择指南:")
for scenario, recommendation in guide.items():
    print(f"{scenario}: {recommendation}")

事件驱动架构实现

// 事件驱动架构实现
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sync"
    "time"
)

// Event 事件接口
type Event interface {
    GetID() string
    GetType() string
    GetSource() string
    GetTimestamp() time.Time
    GetData() interface{}
}

// BaseEvent 基础事件
type BaseEvent struct {
    ID        string      `json:"id"`
    Type      string      `json:"type"`
    Source    string      `json:"source"`
    Timestamp time.Time   `json:"timestamp"`
    Data      interface{} `json:"data"`
}

func (e *BaseEvent) GetID() string        { return e.ID }
func (e *BaseEvent) GetType() string      { return e.Type }
func (e *BaseEvent) GetSource() string    { return e.Source }
func (e *BaseEvent) GetTimestamp() time.Time { return e.Timestamp }
func (e *BaseEvent) GetData() interface{} { return e.Data }

// EventHandler 事件处理器接口
type EventHandler interface {
    Handle(ctx context.Context, event Event) error
    GetEventTypes() []string
}

// EventBus 事件总线接口
type EventBus interface {
    Publish(ctx context.Context, event Event) error
    Subscribe(handler EventHandler) error
    Unsubscribe(handler EventHandler) error
    Start(ctx context.Context) error
    Stop() error
}

// InMemoryEventBus 内存事件总线实现
type InMemoryEventBus struct {
    handlers map[string][]EventHandler
    mutex    sync.RWMutex
    running  bool
    eventCh  chan Event
}

func NewInMemoryEventBus(bufferSize int) *InMemoryEventBus {
    return &InMemoryEventBus{
        handlers: make(map[string][]EventHandler),
        eventCh:  make(chan Event, bufferSize),
    }
}

func (bus *InMemoryEventBus) Publish(ctx context.Context, event Event) error {
    if !bus.running {
        return fmt.Errorf("event bus is not running")
    }
    
    select {
    case bus.eventCh <- event:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    default:
        return fmt.Errorf("event bus is full")
    }
}

func (bus *InMemoryEventBus) Subscribe(handler EventHandler) error {
    bus.mutex.Lock()
    defer bus.mutex.Unlock()
    
    for _, eventType := range handler.GetEventTypes() {
        if bus.handlers[eventType] == nil {
            bus.handlers[eventType] = make([]EventHandler, 0)
        }
        bus.handlers[eventType] = append(bus.handlers[eventType], handler)
    }
    
    log.Printf("Subscribed handler for events: %v", handler.GetEventTypes())
    return nil
}

func (bus *InMemoryEventBus) Unsubscribe(handler EventHandler) error {
    bus.mutex.Lock()
    defer bus.mutex.Unlock()
    
    for _, eventType := range handler.GetEventTypes() {
        handlers := bus.handlers[eventType]
        for i, h := range handlers {
            if h == handler {
                bus.handlers[eventType] = append(handlers[:i], handlers[i+1:]...)
                break
            }
        }
    }
    
    return nil
}

func (bus *InMemoryEventBus) Start(ctx context.Context) error {
    if bus.running {
        return fmt.Errorf("event bus is already running")
    }
    
    bus.running = true
    
    go func() {
        defer func() {
            bus.running = false
        }()
        
        for {
            select {
            case event := <-bus.eventCh:
                bus.handleEvent(ctx, event)
            case <-ctx.Done():
                log.Println("Event bus stopped")
                return
            }
        }
    }()
    
    log.Println("Event bus started")
    return nil
}

func (bus *InMemoryEventBus) Stop() error {
    bus.running = false
    close(bus.eventCh)
    return nil
}

func (bus *InMemoryEventBus) handleEvent(ctx context.Context, event Event) {
    bus.mutex.RLock()
    handlers := bus.handlers[event.GetType()]
    bus.mutex.RUnlock()
    
    if len(handlers) == 0 {
        log.Printf("No handlers for event type: %s", event.GetType())
        return
    }
    
    // 并发处理事件
    var wg sync.WaitGroup
    for _, handler := range handlers {
        wg.Add(1)
        go func(h EventHandler) {
            defer wg.Done()
            
            if err := h.Handle(ctx, event); err != nil {
                log.Printf("Error handling event %s: %v", event.GetID(), err)
            }
        }(handler)
    }
    
    wg.Wait()
}

// 具体事件类型
type UserCreatedEvent struct {
    *BaseEvent
    UserID   string `json:"user_id"`
    Username string `json:"username"`
    Email    string `json:"email"`
}

func NewUserCreatedEvent(userID, username, email string) *UserCreatedEvent {
    return &UserCreatedEvent{
        BaseEvent: &BaseEvent{
            ID:        fmt.Sprintf("user-created-%d", time.Now().UnixNano()),
            Type:      "user.created",
            Source:    "user-service",
            Timestamp: time.Now(),
            Data: map[string]interface{}{
                "user_id":  userID,
                "username": username,
                "email":    email,
            },
        },
        UserID:   userID,
        Username: username,
        Email:    email,
    }
}

type OrderCreatedEvent struct {
    *BaseEvent
    OrderID string  `json:"order_id"`
    UserID  string  `json:"user_id"`
    Amount  float64 `json:"amount"`
}

func NewOrderCreatedEvent(orderID, userID string, amount float64) *OrderCreatedEvent {
    return &OrderCreatedEvent{
        BaseEvent: &BaseEvent{
            ID:        fmt.Sprintf("order-created-%d", time.Now().UnixNano()),
            Type:      "order.created",
            Source:    "order-service",
            Timestamp: time.Now(),
            Data: map[string]interface{}{
                "order_id": orderID,
                "user_id":  userID,
                "amount":   amount,
            },
        },
        OrderID: orderID,
        UserID:  userID,
        Amount:  amount,
    }
}

// 事件处理器实现
type EmailNotificationHandler struct {
    serviceName string
}

func NewEmailNotificationHandler() *EmailNotificationHandler {
    return &EmailNotificationHandler{
        serviceName: "email-notification-service",
    }
}

func (h *EmailNotificationHandler) Handle(ctx context.Context, event Event) error {
    switch event.GetType() {
    case "user.created":
        return h.handleUserCreated(ctx, event)
    case "order.created":
        return h.handleOrderCreated(ctx, event)
    default:
        return fmt.Errorf("unsupported event type: %s", event.GetType())
    }
}

func (h *EmailNotificationHandler) GetEventTypes() []string {
    return []string{"user.created", "order.created"}
}

func (h *EmailNotificationHandler) handleUserCreated(ctx context.Context, event Event) error {
    data := event.GetData().(map[string]interface{})
    email := data["email"].(string)
    username := data["username"].(string)
    
    log.Printf("[%s] Sending welcome email to %s (%s)", h.serviceName, username, email)
    
    // 模拟发送邮件
    time.Sleep(100 * time.Millisecond)
    
    return nil
}

func (h *EmailNotificationHandler) handleOrderCreated(ctx context.Context, event Event) error {
    data := event.GetData().(map[string]interface{})
    orderID := data["order_id"].(string)
    userID := data["user_id"].(string)
    
    log.Printf("[%s] Sending order confirmation for order %s to user %s", h.serviceName, orderID, userID)
    
    // 模拟发送邮件
    time.Sleep(100 * time.Millisecond)
    
    return nil
}

type AuditLogHandler struct {
    serviceName string
}

func NewAuditLogHandler() *AuditLogHandler {
    return &AuditLogHandler{
        serviceName: "audit-log-service",
    }
}

func (h *AuditLogHandler) Handle(ctx context.Context, event Event) error {
    eventData, _ := json.Marshal(event)
    log.Printf("[%s] Audit log: %s", h.serviceName, string(eventData))
    return nil
}

func (h *AuditLogHandler) GetEventTypes() []string {
    return []string{"user.created", "order.created"}
}

// 使用示例
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // 创建事件总线
    eventBus := NewInMemoryEventBus(1000)
    
    // 创建事件处理器
    emailHandler := NewEmailNotificationHandler()
    auditHandler := NewAuditLogHandler()
    
    // 订阅事件
    eventBus.Subscribe(emailHandler)
    eventBus.Subscribe(auditHandler)
    
    // 启动事件总线
    if err := eventBus.Start(ctx); err != nil {
        log.Fatal("Failed to start event bus:", err)
    }
    
    // 模拟发布事件
    userEvent := NewUserCreatedEvent("user123", "john_doe", "john@example.com")
    if err := eventBus.Publish(ctx, userEvent); err != nil {
        log.Printf("Failed to publish user event: %v", err)
    }
    
    orderEvent := NewOrderCreatedEvent("order456", "user123", 99.99)
    if err := eventBus.Publish(ctx, orderEvent); err != nil {
        log.Printf("Failed to publish order event: %v", err)
    }
    
    // 等待事件处理完成
    time.Sleep(1 * time.Second)
    
    // 停止事件总线
    eventBus.Stop()
}

2.4 容器化与编排

Docker容器化

# 多阶段构建Dockerfile示例

# 构建阶段
FROM golang:1.21-alpine AS builder

# 设置工作目录
WORKDIR /app

# 安装依赖
RUN apk add --no-cache git ca-certificates tzdata

# 复制go mod文件
COPY go.mod go.sum ./

# 下载依赖
RUN go mod download

# 复制源代码
COPY . .

# 构建应用
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main .

# 运行阶段
FROM alpine:latest

# 安装ca证书和时区数据
RUN apk --no-cache add ca-certificates tzdata

# 设置时区
ENV TZ=Asia/Shanghai

# 创建非root用户
RUN addgroup -g 1001 appgroup && \
    adduser -D -s /bin/sh -u 1001 -G appgroup appuser

# 设置工作目录
WORKDIR /app

# 从构建阶段复制二进制文件
COPY --from=builder /app/main .

# 复制配置文件
COPY --from=builder /app/config ./config

# 设置文件权限
RUN chown -R appuser:appgroup /app

# 切换到非root用户
USER appuser

# 暴露端口
EXPOSE 8080

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD wget --no-verbose --tries=1 --spider http://localhost:8080/health || exit 1

# 启动应用
CMD ["./main"]

Kubernetes部署配置

# 微服务Kubernetes部署配置
apiVersion: v1
kind: Namespace
metadata:
  name: microservices
  labels:
    name: microservices
    environment: production

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: app-config
  namespace: microservices
data:
  app.yaml: |
    server:
      port: 8080
      read_timeout: 30s
      write_timeout: 30s
      idle_timeout: 120s
    
    database:
      host: postgres-service
      port: 5432
      name: microservice_db
      max_connections: 20
      max_idle: 5
      max_lifetime: 1h
    
    redis:
      host: redis-service
      port: 6379
      database: 0
      max_connections: 100
    
    logging:
      level: info
      format: json
      output: stdout
    
    metrics:
      enabled: true
      path: /metrics
      port: 9090
    
    tracing:
      enabled: true
      jaeger_endpoint: http://jaeger-collector:14268/api/traces
      sample_rate: 0.1

---
apiVersion: v1
kind: Secret
metadata:
  name: app-secrets
  namespace: microservices
type: Opaque
data:
  database-password: cGFzc3dvcmQxMjM=  # password123
  jwt-secret: c2VjcmV0a2V5MTIz          # secretkey123
  redis-password: cmVkaXNwYXNz          # redispass

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
  namespace: microservices
  labels:
    app: user-service
    version: v1
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: user-service
      version: v1
  template:
    metadata:
      labels:
        app: user-service
        version: v1
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9090"
        prometheus.io/path: "/metrics"
    spec:
      serviceAccountName: microservice-sa
      securityContext:
        runAsNonRoot: true
        runAsUser: 1001
        fsGroup: 1001
      containers:
      - name: user-service
        image: user-service:v1.0.0
        imagePullPolicy: IfNotPresent
        ports:
        - name: http
          containerPort: 8080
          protocol: TCP
        - name: metrics
          containerPort: 9090
          protocol: TCP
        env:
        - name: CONFIG_PATH
          value: "/config/app.yaml"
        - name: DATABASE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: database-password
        - name: JWT_SECRET
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: jwt-secret
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        volumeMounts:
        - name: config
          mountPath: /config
          readOnly: true
        - name: tmp
          mountPath: /tmp
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "256Mi"
            cpu: "200m"
        livenessProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /ready
            port: http
          initialDelaySeconds: 5
          periodSeconds: 5
          timeoutSeconds: 3
          failureThreshold: 3
        startupProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 10
          periodSeconds: 5
          timeoutSeconds: 3
          failureThreshold: 30
      volumes:
      - name: config
        configMap:
          name: app-config
      - name: tmp
        emptyDir: {}
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - user-service
              topologyKey: kubernetes.io/hostname
      tolerations:
      - key: "node.kubernetes.io/not-ready"
        operator: "Exists"
        effect: "NoExecute"
        tolerationSeconds: 300
      - key: "node.kubernetes.io/unreachable"
        operator: "Exists"
        effect: "NoExecute"
        tolerationSeconds: 300

---
apiVersion: v1
kind: Service
metadata:
  name: user-service
  namespace: microservices
  labels:
    app: user-service
spec:
  type: ClusterIP
  ports:
  - name: http
    port: 80
    targetPort: http
    protocol: TCP
  - name: metrics
    port: 9090
    targetPort: metrics
    protocol: TCP
  selector:
    app: user-service

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: microservice-sa
  namespace: microservices

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: microservices
  name: microservice-role
rules:
- apiGroups: [""]
  resources: ["configmaps", "secrets"]
  verbs: ["get", "list", "watch"]
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "list"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: microservice-rolebinding
  namespace: microservices
subjects:
- kind: ServiceAccount
  name: microservice-sa
  namespace: microservices
roleRef:
  kind: Role
  name: microservice-role
  apiGroup: rbac.authorization.k8s.io

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: user-service-hpa
  namespace: microservices
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: user-service
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
      - type: Pods
        value: 2
        periodSeconds: 60
      selectPolicy: Max

---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: user-service-pdb
  namespace: microservices
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: user-service

---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: user-service-netpol
  namespace: microservices
spec:
  podSelector:
    matchLabels:
      app: user-service
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          name: microservices
    - namespaceSelector:
        matchLabels:
          name: api-gateway
    ports:
    - protocol: TCP
      port: 8080
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          name: microservices
    ports:
    - protocol: TCP
      port: 5432  # PostgreSQL
    - protocol: TCP
      port: 6379  # Redis
  - to: []  # Allow DNS
    ports:
    - protocol: UDP
      port: 53

2.5 总结

本章详细介绍了微服务技术栈选择和架构设计的关键要素:

技术选型要点

  • 编程语言: 根据性能、团队经验、生态系统选择
  • 框架选择: 平衡功能丰富度和学习成本
  • 数据库技术: 基于数据特征和一致性需求选型
  • 消息队列: 考虑吞吐量、延迟、可靠性需求

架构设计原则

  • 容器化: 标准化部署和运行环境
  • 编排管理: 自动化部署、扩展和运维
  • 配置管理: 外部化配置和密钥管理
  • 可观测性: 内置监控、日志和追踪能力

最佳实践

  • 多阶段构建: 优化镜像大小和安全性
  • 健康检查: 确保服务可用性
  • 资源限制: 合理配置CPU和内存
  • 安全策略: 最小权限和网络隔离

下一章预告

下一章将深入探讨服务间通信,包括: - 同步通信模式(REST、gRPC) - 异步通信模式(消息队列、事件流) - 服务发现和负载均衡 - 通信安全和认证授权

通过本章的学习,您应该能够为微服务项目选择合适的技术栈,并设计出可扩展、可维护的架构方案。