概述

在微服务架构中,数据管理是最具挑战性的问题之一。与单体应用不同,微服务需要处理分布式数据存储、数据一致性、事务管理等复杂问题。本章将深入探讨微服务架构中的数据管理策略和一致性保证机制。

数据库设计模式

每服务一个数据库模式

每个微服务拥有自己的数据库,确保服务间的数据隔离。

# 用户服务数据库配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: user-db-config
  namespace: microservices
data:
  database.yml: |
    production:
      adapter: postgresql
      host: user-postgres
      port: 5432
      database: user_service
      username: user_service
      password: ${DB_PASSWORD}
      pool: 20
      timeout: 5000
      encoding: unicode
      schema_search_path: "user_service,shared,public"
      
    migrations:
      enabled: true
      path: /app/db/migrations
      
    connection_pool:
      initial_size: 5
      max_size: 20
      checkout_timeout: 5000
      idle_timeout: 300000
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: user-postgres
  namespace: microservices
spec:
  serviceName: user-postgres
  replicas: 1
  selector:
    matchLabels:
      app: user-postgres
  template:
    metadata:
      labels:
        app: user-postgres
    spec:
      containers:
      - name: postgres
        image: postgres:15
        env:
        - name: POSTGRES_DB
          value: user_service
        - name: POSTGRES_USER
          value: user_service
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: user-db-secret
              key: password
        - name: PGDATA
          value: /var/lib/postgresql/data/pgdata
        ports:
        - containerPort: 5432
          name: postgres
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
        - name: init-scripts
          mountPath: /docker-entrypoint-initdb.d
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          exec:
            command:
            - pg_isready
            - -U
            - user_service
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          exec:
            command:
            - pg_isready
            - -U
            - user_service
          initialDelaySeconds: 5
          periodSeconds: 5
      volumes:
      - name: init-scripts
        configMap:
          name: user-db-init
  volumeClaimTemplates:
  - metadata:
      name: postgres-storage
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi
---
apiVersion: v1
kind: Service
metadata:
  name: user-postgres
  namespace: microservices
spec:
  selector:
    app: user-postgres
  ports:
  - port: 5432
    targetPort: 5432
  clusterIP: None
-- 用户服务数据库初始化脚本
-- init-user-db.sql

-- 创建用户表
CREATE TABLE IF NOT EXISTS users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    phone VARCHAR(20),
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    version INTEGER DEFAULT 1
);

-- 创建用户配置表
CREATE TABLE IF NOT EXISTS user_profiles (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    avatar_url VARCHAR(255),
    bio TEXT,
    location VARCHAR(100),
    website VARCHAR(255),
    birth_date DATE,
    gender VARCHAR(10),
    preferences JSONB,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- 创建用户会话表
CREATE TABLE IF NOT EXISTS user_sessions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    session_token VARCHAR(255) UNIQUE NOT NULL,
    refresh_token VARCHAR(255),
    expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    last_accessed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    ip_address INET,
    user_agent TEXT
);

-- 创建索引
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
CREATE INDEX IF NOT EXISTS idx_users_status ON users(status);
CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);

CREATE INDEX IF NOT EXISTS idx_user_profiles_user_id ON user_profiles(user_id);

CREATE INDEX IF NOT EXISTS idx_user_sessions_user_id ON user_sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_user_sessions_token ON user_sessions(session_token);
CREATE INDEX IF NOT EXISTS idx_user_sessions_expires_at ON user_sessions(expires_at);

-- 创建更新时间触发器
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_user_profiles_updated_at BEFORE UPDATE ON user_profiles
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- 创建版本控制触发器
CREATE OR REPLACE FUNCTION increment_version()
RETURNS TRIGGER AS $$
BEGIN
    NEW.version = OLD.version + 1;
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER increment_users_version BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE FUNCTION increment_version();

-- 插入示例数据
INSERT INTO users (username, email, password_hash, first_name, last_name) VALUES
('admin', 'admin@example.com', '$2a$10$N9qo8uLOickgx2ZMRZoMye', 'Admin', 'User'),
('john_doe', 'john@example.com', '$2a$10$N9qo8uLOickgx2ZMRZoMye', 'John', 'Doe'),
('jane_smith', 'jane@example.com', '$2a$10$N9qo8uLOickgx2ZMRZoMye', 'Jane', 'Smith')
ON CONFLICT (username) DO NOTHING;
// Go语言数据访问层实现
package repository

import (
    "context"
    "database/sql"
    "fmt"
    "time"
    
    "github.com/google/uuid"
    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
)

// User 用户实体
type User struct {
    ID           uuid.UUID  `json:"id" db:"id"`
    Username     string     `json:"username" db:"username"`
    Email        string     `json:"email" db:"email"`
    PasswordHash string     `json:"-" db:"password_hash"`
    FirstName    *string    `json:"first_name" db:"first_name"`
    LastName     *string    `json:"last_name" db:"last_name"`
    Phone        *string    `json:"phone" db:"phone"`
    Status       string     `json:"status" db:"status"`
    CreatedAt    time.Time  `json:"created_at" db:"created_at"`
    UpdatedAt    time.Time  `json:"updated_at" db:"updated_at"`
    Version      int        `json:"version" db:"version"`
}

// UserProfile 用户配置
type UserProfile struct {
    ID          uuid.UUID              `json:"id" db:"id"`
    UserID      uuid.UUID              `json:"user_id" db:"user_id"`
    AvatarURL   *string                `json:"avatar_url" db:"avatar_url"`
    Bio         *string                `json:"bio" db:"bio"`
    Location    *string                `json:"location" db:"location"`
    Website     *string                `json:"website" db:"website"`
    BirthDate   *time.Time             `json:"birth_date" db:"birth_date"`
    Gender      *string                `json:"gender" db:"gender"`
    Preferences map[string]interface{} `json:"preferences" db:"preferences"`
    CreatedAt   time.Time              `json:"created_at" db:"created_at"`
    UpdatedAt   time.Time              `json:"updated_at" db:"updated_at"`
}

// UserRepository 用户仓储接口
type UserRepository interface {
    Create(ctx context.Context, user *User) error
    GetByID(ctx context.Context, id uuid.UUID) (*User, error)
    GetByEmail(ctx context.Context, email string) (*User, error)
    GetByUsername(ctx context.Context, username string) (*User, error)
    Update(ctx context.Context, user *User) error
    Delete(ctx context.Context, id uuid.UUID) error
    List(ctx context.Context, limit, offset int) ([]*User, error)
    Count(ctx context.Context) (int64, error)
}

// PostgresUserRepository PostgreSQL用户仓储实现
type PostgresUserRepository struct {
    db *sqlx.DB
}

func NewPostgresUserRepository(db *sqlx.DB) UserRepository {
    return &PostgresUserRepository{db: db}
}

func (r *PostgresUserRepository) Create(ctx context.Context, user *User) error {
    query := `
        INSERT INTO users (username, email, password_hash, first_name, last_name, phone, status)
        VALUES (:username, :email, :password_hash, :first_name, :last_name, :phone, :status)
        RETURNING id, created_at, updated_at, version
    `
    
    rows, err := r.db.NamedQueryContext(ctx, query, user)
    if err != nil {
        return fmt.Errorf("failed to create user: %w", err)
    }
    defer rows.Close()
    
    if rows.Next() {
        err = rows.Scan(&user.ID, &user.CreatedAt, &user.UpdatedAt, &user.Version)
        if err != nil {
            return fmt.Errorf("failed to scan user: %w", err)
        }
    }
    
    return nil
}

func (r *PostgresUserRepository) GetByID(ctx context.Context, id uuid.UUID) (*User, error) {
    query := `
        SELECT id, username, email, password_hash, first_name, last_name, phone, status,
               created_at, updated_at, version
        FROM users
        WHERE id = $1 AND status != 'deleted'
    `
    
    var user User
    err := r.db.GetContext(ctx, &user, query, id)
    if err != nil {
        if err == sql.ErrNoRows {
            return nil, fmt.Errorf("user not found: %w", err)
        }
        return nil, fmt.Errorf("failed to get user: %w", err)
    }
    
    return &user, nil
}

func (r *PostgresUserRepository) GetByEmail(ctx context.Context, email string) (*User, error) {
    query := `
        SELECT id, username, email, password_hash, first_name, last_name, phone, status,
               created_at, updated_at, version
        FROM users
        WHERE email = $1 AND status != 'deleted'
    `
    
    var user User
    err := r.db.GetContext(ctx, &user, query, email)
    if err != nil {
        if err == sql.ErrNoRows {
            return nil, fmt.Errorf("user not found: %w", err)
        }
        return nil, fmt.Errorf("failed to get user: %w", err)
    }
    
    return &user, nil
}

func (r *PostgresUserRepository) GetByUsername(ctx context.Context, username string) (*User, error) {
    query := `
        SELECT id, username, email, password_hash, first_name, last_name, phone, status,
               created_at, updated_at, version
        FROM users
        WHERE username = $1 AND status != 'deleted'
    `
    
    var user User
    err := r.db.GetContext(ctx, &user, query, username)
    if err != nil {
        if err == sql.ErrNoRows {
            return nil, fmt.Errorf("user not found: %w", err)
        }
        return nil, fmt.Errorf("failed to get user: %w", err)
    }
    
    return &user, nil
}

func (r *PostgresUserRepository) Update(ctx context.Context, user *User) error {
    query := `
        UPDATE users
        SET username = :username, email = :email, password_hash = :password_hash,
            first_name = :first_name, last_name = :last_name, phone = :phone, status = :status
        WHERE id = :id AND version = :version
        RETURNING updated_at, version
    `
    
    rows, err := r.db.NamedQueryContext(ctx, query, user)
    if err != nil {
        return fmt.Errorf("failed to update user: %w", err)
    }
    defer rows.Close()
    
    if rows.Next() {
        err = rows.Scan(&user.UpdatedAt, &user.Version)
        if err != nil {
            return fmt.Errorf("failed to scan updated user: %w", err)
        }
    } else {
        return fmt.Errorf("user not found or version mismatch")
    }
    
    return nil
}

func (r *PostgresUserRepository) Delete(ctx context.Context, id uuid.UUID) error {
    query := `UPDATE users SET status = 'deleted' WHERE id = $1`
    
    result, err := r.db.ExecContext(ctx, query, id)
    if err != nil {
        return fmt.Errorf("failed to delete user: %w", err)
    }
    
    rowsAffected, err := result.RowsAffected()
    if err != nil {
        return fmt.Errorf("failed to get rows affected: %w", err)
    }
    
    if rowsAffected == 0 {
        return fmt.Errorf("user not found")
    }
    
    return nil
}

func (r *PostgresUserRepository) List(ctx context.Context, limit, offset int) ([]*User, error) {
    query := `
        SELECT id, username, email, password_hash, first_name, last_name, phone, status,
               created_at, updated_at, version
        FROM users
        WHERE status != 'deleted'
        ORDER BY created_at DESC
        LIMIT $1 OFFSET $2
    `
    
    var users []*User
    err := r.db.SelectContext(ctx, &users, query, limit, offset)
    if err != nil {
        return nil, fmt.Errorf("failed to list users: %w", err)
    }
    
    return users, nil
}

func (r *PostgresUserRepository) Count(ctx context.Context) (int64, error) {
    query := `SELECT COUNT(*) FROM users WHERE status != 'deleted'`
    
    var count int64
    err := r.db.GetContext(ctx, &count, query)
    if err != nil {
        return 0, fmt.Errorf("failed to count users: %w", err)
    }
    
    return count, nil
}

// DatabaseManager 数据库管理器
type DatabaseManager struct {
    db *sqlx.DB
}

func NewDatabaseManager(databaseURL string) (*DatabaseManager, error) {
    db, err := sqlx.Connect("postgres", databaseURL)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to database: %w", err)
    }
    
    // 配置连接池
    db.SetMaxOpenConns(20)
    db.SetMaxIdleConns(5)
    db.SetConnMaxLifetime(time.Hour)
    
    return &DatabaseManager{db: db}, nil
}

func (dm *DatabaseManager) Close() error {
    return dm.db.Close()
}

func (dm *DatabaseManager) Health(ctx context.Context) error {
    return dm.db.PingContext(ctx)
}

func (dm *DatabaseManager) GetDB() *sqlx.DB {
    return dm.db
}

// Transaction 事务管理
func (dm *DatabaseManager) WithTransaction(ctx context.Context, fn func(*sqlx.Tx) error) error {
    tx, err := dm.db.BeginTxx(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to begin transaction: %w", err)
    }
    
    defer func() {
        if p := recover(); p != nil {
            tx.Rollback()
            panic(p)
        } else if err != nil {
            tx.Rollback()
        } else {
            err = tx.Commit()
        }
    }()
    
    err = fn(tx)
    return err
}

两阶段提交(2PC)

两阶段提交协议确保分布式事务的原子性。

// 两阶段提交实现
package twopc

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

// TransactionManager 事务管理器
type TransactionManager struct {
    participants map[string]Participant
    transactions map[string]*Transaction
    mutex        sync.RWMutex
}

// Participant 参与者接口
type Participant interface {
    Prepare(ctx context.Context, txID string, data interface{}) error
    Commit(ctx context.Context, txID string) error
    Abort(ctx context.Context, txID string) error
}

// Transaction 事务状态
type Transaction struct {
    ID           string
    Status       TransactionStatus
    Participants []string
    Data         interface{}
    StartTime    time.Time
    EndTime      *time.Time
    mutex        sync.RWMutex
}

// TransactionStatus 事务状态
type TransactionStatus int

const (
    TxStatusPending TransactionStatus = iota
    TxStatusPreparing
    TxStatusPrepared
    TxStatusCommitting
    TxStatusCommitted
    TxStatusAborting
    TxStatusAborted
)

func NewTransactionManager() *TransactionManager {
    return &TransactionManager{
        participants: make(map[string]Participant),
        transactions: make(map[string]*Transaction),
    }
}

func (tm *TransactionManager) RegisterParticipant(name string, participant Participant) {
    tm.mutex.Lock()
    defer tm.mutex.Unlock()
    tm.participants[name] = participant
}

func (tm *TransactionManager) BeginTransaction(txID string, participantNames []string, data interface{}) error {
    tm.mutex.Lock()
    defer tm.mutex.Unlock()
    
    if _, exists := tm.transactions[txID]; exists {
        return fmt.Errorf("transaction %s already exists", txID)
    }
    
    // 验证所有参与者都已注册
    for _, name := range participantNames {
        if _, exists := tm.participants[name]; !exists {
            return fmt.Errorf("participant %s not registered", name)
        }
    }
    
    tx := &Transaction{
        ID:           txID,
        Status:       TxStatusPending,
        Participants: participantNames,
        Data:         data,
        StartTime:    time.Now(),
    }
    
    tm.transactions[txID] = tx
    return nil
}

func (tm *TransactionManager) CommitTransaction(ctx context.Context, txID string) error {
    tm.mutex.RLock()
    tx, exists := tm.transactions[txID]
    tm.mutex.RUnlock()
    
    if !exists {
        return fmt.Errorf("transaction %s not found", txID)
    }
    
    // 阶段1:准备阶段
    tx.mutex.Lock()
    tx.Status = TxStatusPreparing
    tx.mutex.Unlock()
    
    log.Printf("Starting prepare phase for transaction %s", txID)
    
    for _, participantName := range tx.Participants {
        participant := tm.participants[participantName]
        err := participant.Prepare(ctx, txID, tx.Data)
        if err != nil {
            log.Printf("Participant %s failed to prepare: %v", participantName, err)
            return tm.abortTransaction(ctx, txID)
        }
        log.Printf("Participant %s prepared successfully", participantName)
    }
    
    tx.mutex.Lock()
    tx.Status = TxStatusPrepared
    tx.mutex.Unlock()
    
    log.Printf("All participants prepared for transaction %s", txID)
    
    // 阶段2:提交阶段
    tx.mutex.Lock()
    tx.Status = TxStatusCommitting
    tx.mutex.Unlock()
    
    log.Printf("Starting commit phase for transaction %s", txID)
    
    for _, participantName := range tx.Participants {
        participant := tm.participants[participantName]
        err := participant.Commit(ctx, txID)
        if err != nil {
            log.Printf("Participant %s failed to commit: %v", participantName, err)
            // 提交阶段失败,需要人工干预
            return fmt.Errorf("commit failed for participant %s: %w", participantName, err)
        }
        log.Printf("Participant %s committed successfully", participantName)
    }
    
    tx.mutex.Lock()
    tx.Status = TxStatusCommitted
    now := time.Now()
    tx.EndTime = &now
    tx.mutex.Unlock()
    
    log.Printf("Transaction %s committed successfully", txID)
    return nil
}

func (tm *TransactionManager) abortTransaction(ctx context.Context, txID string) error {
    tm.mutex.RLock()
    tx, exists := tm.transactions[txID]
    tm.mutex.RUnlock()
    
    if !exists {
        return fmt.Errorf("transaction %s not found", txID)
    }
    
    tx.mutex.Lock()
    tx.Status = TxStatusAborting
    tx.mutex.Unlock()
    
    log.Printf("Aborting transaction %s", txID)
    
    for _, participantName := range tx.Participants {
        participant := tm.participants[participantName]
        err := participant.Abort(ctx, txID)
        if err != nil {
            log.Printf("Participant %s failed to abort: %v", participantName, err)
        }
    }
    
    tx.mutex.Lock()
    tx.Status = TxStatusAborted
    now := time.Now()
    tx.EndTime = &now
    tx.mutex.Unlock()
    
    log.Printf("Transaction %s aborted", txID)
    return fmt.Errorf("transaction aborted")
}

// 数据库参与者实现
type DatabaseParticipant struct {
    name string
    db   interface{} // 数据库连接
    preparedTx map[string]interface{} // 准备好的事务
    mutex sync.RWMutex
}

func NewDatabaseParticipant(name string, db interface{}) *DatabaseParticipant {
    return &DatabaseParticipant{
        name: name,
        db: db,
        preparedTx: make(map[string]interface{}),
    }
}

func (dp *DatabaseParticipant) Prepare(ctx context.Context, txID string, data interface{}) error {
    log.Printf("[%s] Preparing transaction %s", dp.name, txID)
    
    // 模拟数据库准备操作
    // 在实际实现中,这里会开始数据库事务并执行操作,但不提交
    
    dp.mutex.Lock()
    dp.preparedTx[txID] = data
    dp.mutex.Unlock()
    
    log.Printf("[%s] Transaction %s prepared", dp.name, txID)
    return nil
}

func (dp *DatabaseParticipant) Commit(ctx context.Context, txID string) error {
    log.Printf("[%s] Committing transaction %s", dp.name, txID)
    
    dp.mutex.Lock()
    defer dp.mutex.Unlock()
    
    if _, exists := dp.preparedTx[txID]; !exists {
        return fmt.Errorf("transaction %s not prepared", txID)
    }
    
    // 模拟数据库提交操作
    delete(dp.preparedTx, txID)
    
    log.Printf("[%s] Transaction %s committed", dp.name, txID)
    return nil
}

func (dp *DatabaseParticipant) Abort(ctx context.Context, txID string) error {
    log.Printf("[%s] Aborting transaction %s", dp.name, txID)
    
    dp.mutex.Lock()
    defer dp.mutex.Unlock()
    
    // 模拟数据库回滚操作
    delete(dp.preparedTx, txID)
    
    log.Printf("[%s] Transaction %s aborted", dp.name, txID)
    return nil
}

CQRS模式

CQRS(Command Query Responsibility Segregation)将读写操作分离,提高系统性能和可扩展性。

// CQRS模式实现
package cqrs

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sync"
    "time"
    
    "github.com/google/uuid"
)

// Command 命令接口
type Command interface {
    GetID() string
    GetType() string
    GetData() interface{}
}

// Query 查询接口
type Query interface {
    GetID() string
    GetType() string
    GetCriteria() interface{}
}

// Event 事件接口
type Event interface {
    GetID() string
    GetType() string
    GetAggregateID() string
    GetData() interface{}
    GetTimestamp() time.Time
}

// CommandHandler 命令处理器接口
type CommandHandler interface {
    Handle(ctx context.Context, cmd Command) ([]Event, error)
}

// QueryHandler 查询处理器接口
type QueryHandler interface {
    Handle(ctx context.Context, query Query) (interface{}, error)
}

// EventHandler 事件处理器接口
type EventHandler interface {
    Handle(ctx context.Context, event Event) error
}

// CommandBus 命令总线
type CommandBus struct {
    handlers map[string]CommandHandler
    mutex    sync.RWMutex
}

func NewCommandBus() *CommandBus {
    return &CommandBus{
        handlers: make(map[string]CommandHandler),
    }
}

func (cb *CommandBus) RegisterHandler(commandType string, handler CommandHandler) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    cb.handlers[commandType] = handler
}

func (cb *CommandBus) Execute(ctx context.Context, cmd Command) ([]Event, error) {
    cb.mutex.RLock()
    handler, exists := cb.handlers[cmd.GetType()]
    cb.mutex.RUnlock()
    
    if !exists {
        return nil, fmt.Errorf("no handler found for command type: %s", cmd.GetType())
    }
    
    log.Printf("Executing command %s of type %s", cmd.GetID(), cmd.GetType())
    return handler.Handle(ctx, cmd)
}

// QueryBus 查询总线
type QueryBus struct {
    handlers map[string]QueryHandler
    mutex    sync.RWMutex
}

func NewQueryBus() *QueryBus {
    return &QueryBus{
        handlers: make(map[string]QueryHandler),
    }
}

func (qb *QueryBus) RegisterHandler(queryType string, handler QueryHandler) {
    qb.mutex.Lock()
    defer qb.mutex.Unlock()
    qb.handlers[queryType] = handler
}

func (qb *QueryBus) Execute(ctx context.Context, query Query) (interface{}, error) {
    qb.mutex.RLock()
    handler, exists := qb.handlers[query.GetType()]
    qb.mutex.RUnlock()
    
    if !exists {
        return nil, fmt.Errorf("no handler found for query type: %s", query.GetType())
    }
    
    log.Printf("Executing query %s of type %s", query.GetID(), query.GetType())
    return handler.Handle(ctx, query)
}

// EventBus 事件总线
type EventBus struct {
    handlers map[string][]EventHandler
    mutex    sync.RWMutex
}

func NewEventBus() *EventBus {
    return &EventBus{
        handlers: make(map[string][]EventHandler),
    }
}

func (eb *EventBus) RegisterHandler(eventType string, handler EventHandler) {
    eb.mutex.Lock()
    defer eb.mutex.Unlock()
    
    if _, exists := eb.handlers[eventType]; !exists {
        eb.handlers[eventType] = make([]EventHandler, 0)
    }
    eb.handlers[eventType] = append(eb.handlers[eventType], handler)
}

func (eb *EventBus) Publish(ctx context.Context, events []Event) error {
    for _, event := range events {
        eb.mutex.RLock()
        handlers, exists := eb.handlers[event.GetType()]
        eb.mutex.RUnlock()
        
        if !exists {
            log.Printf("No handlers found for event type: %s", event.GetType())
            continue
        }
        
        log.Printf("Publishing event %s of type %s", event.GetID(), event.GetType())
        
        for _, handler := range handlers {
            go func(h EventHandler, e Event) {
                if err := h.Handle(ctx, e); err != nil {
                    log.Printf("Error handling event %s: %v", e.GetID(), err)
                }
            }(handler, event)
        }
    }
    
    return nil
}

// 具体实现示例

// CreateUserCommand 创建用户命令
type CreateUserCommand struct {
    ID       string
    Username string
    Email    string
    Password string
}

func (c *CreateUserCommand) GetID() string {
    return c.ID
}

func (c *CreateUserCommand) GetType() string {
    return "CreateUser"
}

func (c *CreateUserCommand) GetData() interface{} {
    return c
}

// UserCreatedEvent 用户创建事件
type UserCreatedEvent struct {
    ID          string
    AggregateID string
    UserID      string
    Username    string
    Email       string
    Timestamp   time.Time
}

func (e *UserCreatedEvent) GetID() string {
    return e.ID
}

func (e *UserCreatedEvent) GetType() string {
    return "UserCreated"
}

func (e *UserCreatedEvent) GetAggregateID() string {
    return e.AggregateID
}

func (e *UserCreatedEvent) GetData() interface{} {
    return e
}

func (e *UserCreatedEvent) GetTimestamp() time.Time {
    return e.Timestamp
}

// CreateUserCommandHandler 创建用户命令处理器
type CreateUserCommandHandler struct {
    userRepository UserWriteRepository
}

func NewCreateUserCommandHandler(repo UserWriteRepository) *CreateUserCommandHandler {
    return &CreateUserCommandHandler{
        userRepository: repo,
    }
}

func (h *CreateUserCommandHandler) Handle(ctx context.Context, cmd Command) ([]Event, error) {
    createCmd, ok := cmd.(*CreateUserCommand)
    if !ok {
        return nil, fmt.Errorf("invalid command type")
    }
    
    // 验证用户数据
    if createCmd.Username == "" || createCmd.Email == "" {
        return nil, fmt.Errorf("username and email are required")
    }
    
    // 检查用户是否已存在
    exists, err := h.userRepository.ExistsByEmail(ctx, createCmd.Email)
    if err != nil {
        return nil, fmt.Errorf("failed to check user existence: %w", err)
    }
    if exists {
        return nil, fmt.Errorf("user with email %s already exists", createCmd.Email)
    }
    
    // 创建用户
    userID := uuid.New().String()
    user := &WriteUser{
        ID:       userID,
        Username: createCmd.Username,
        Email:    createCmd.Email,
        Password: createCmd.Password, // 在实际应用中应该加密
        Status:   "active",
    }
    
    err = h.userRepository.Create(ctx, user)
    if err != nil {
        return nil, fmt.Errorf("failed to create user: %w", err)
    }
    
    // 生成事件
    event := &UserCreatedEvent{
        ID:          uuid.New().String(),
        AggregateID: userID,
        UserID:      userID,
        Username:    createCmd.Username,
        Email:       createCmd.Email,
        Timestamp:   time.Now(),
    }
    
    return []Event{event}, nil
}

// GetUserQuery 获取用户查询
type GetUserQuery struct {
    ID     string
    UserID string
}

func (q *GetUserQuery) GetID() string {
    return q.ID
}

func (q *GetUserQuery) GetType() string {
    return "GetUser"
}

func (q *GetUserQuery) GetCriteria() interface{} {
    return q
}

// GetUserQueryHandler 获取用户查询处理器
type GetUserQueryHandler struct {
    userRepository UserReadRepository
}

func NewGetUserQueryHandler(repo UserReadRepository) *GetUserQueryHandler {
    return &GetUserQueryHandler{
        userRepository: repo,
    }
}

func (h *GetUserQueryHandler) Handle(ctx context.Context, query Query) (interface{}, error) {
    getUserQuery, ok := query.(*GetUserQuery)
    if !ok {
        return nil, fmt.Errorf("invalid query type")
    }
    
    user, err := h.userRepository.GetByID(ctx, getUserQuery.UserID)
    if err != nil {
        return nil, fmt.Errorf("failed to get user: %w", err)
    }
    
    return user, nil
}

// UserCreatedEventHandler 用户创建事件处理器
type UserCreatedEventHandler struct {
    readRepository UserReadRepository
}

func NewUserCreatedEventHandler(repo UserReadRepository) *UserCreatedEventHandler {
    return &UserCreatedEventHandler{
        readRepository: repo,
    }
}

func (h *UserCreatedEventHandler) Handle(ctx context.Context, event Event) error {
    userCreatedEvent, ok := event.(*UserCreatedEvent)
    if !ok {
        return fmt.Errorf("invalid event type")
    }
    
    // 更新读模型
    readUser := &ReadUser{
        ID:        userCreatedEvent.UserID,
        Username:  userCreatedEvent.Username,
        Email:     userCreatedEvent.Email,
        Status:    "active",
        CreatedAt: userCreatedEvent.Timestamp,
        UpdatedAt: userCreatedEvent.Timestamp,
    }
    
    err := h.readRepository.Create(ctx, readUser)
    if err != nil {
        return fmt.Errorf("failed to create read user: %w", err)
    }
    
    log.Printf("Read model updated for user %s", userCreatedEvent.UserID)
    return nil
}

// 数据模型定义

// WriteUser 写模型用户
type WriteUser struct {
    ID       string
    Username string
    Email    string
    Password string
    Status   string
}

// ReadUser 读模型用户
type ReadUser struct {
    ID        string    `json:"id"`
    Username  string    `json:"username"`
    Email     string    `json:"email"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
    UpdatedAt time.Time `json:"updated_at"`
}

// UserWriteRepository 用户写仓储接口
type UserWriteRepository interface {
    Create(ctx context.Context, user *WriteUser) error
    Update(ctx context.Context, user *WriteUser) error
    Delete(ctx context.Context, id string) error
    ExistsByEmail(ctx context.Context, email string) (bool, error)
}

// UserReadRepository 用户读仓储接口
type UserReadRepository interface {
    GetByID(ctx context.Context, id string) (*ReadUser, error)
    GetByEmail(ctx context.Context, email string) (*ReadUser, error)
    List(ctx context.Context, limit, offset int) ([]*ReadUser, error)
    Create(ctx context.Context, user *ReadUser) error
    Update(ctx context.Context, user *ReadUser) error
}

// CQRS应用服务
type UserService struct {
    commandBus *CommandBus
    queryBus   *QueryBus
    eventBus   *EventBus
}

func NewUserService(commandBus *CommandBus, queryBus *QueryBus, eventBus *EventBus) *UserService {
    return &UserService{
        commandBus: commandBus,
        queryBus:   queryBus,
        eventBus:   eventBus,
    }
}

func (s *UserService) CreateUser(ctx context.Context, username, email, password string) error {
    cmd := &CreateUserCommand{
        ID:       uuid.New().String(),
        Username: username,
        Email:    email,
        Password: password,
    }
    
    events, err := s.commandBus.Execute(ctx, cmd)
    if err != nil {
        return err
    }
    
    return s.eventBus.Publish(ctx, events)
}

func (s *UserService) GetUser(ctx context.Context, userID string) (*ReadUser, error) {
    query := &GetUserQuery{
        ID:     uuid.New().String(),
        UserID: userID,
    }
    
    result, err := s.queryBus.Execute(ctx, query)
    if err != nil {
        return nil, err
    }
    
    user, ok := result.(*ReadUser)
    if !ok {
        return nil, fmt.Errorf("invalid result type")
    }
    
    return user, nil
}

事件溯源模式

事件溯源通过存储事件序列来重建应用状态,提供完整的审计跟踪。

// 事件溯源实现
package eventsourcing

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sort"
    "sync"
    "time"
    
    "github.com/google/uuid"
)

// Event 事件接口
type Event interface {
    GetEventID() string
    GetEventType() string
    GetAggregateID() string
    GetEventData() interface{}
    GetTimestamp() time.Time
    GetVersion() int
}

// EventStore 事件存储接口
type EventStore interface {
    SaveEvents(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error
    GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
    GetEventsFromVersion(ctx context.Context, aggregateID string, fromVersion int) ([]Event, error)
    GetAllEvents(ctx context.Context, fromTimestamp time.Time) ([]Event, error)
}

// Aggregate 聚合根接口
type Aggregate interface {
    GetID() string
    GetVersion() int
    GetUncommittedEvents() []Event
    MarkEventsAsCommitted()
    LoadFromHistory(events []Event)
}

// Repository 聚合仓储接口
type Repository interface {
    Save(ctx context.Context, aggregate Aggregate, expectedVersion int) error
    GetByID(ctx context.Context, id string) (Aggregate, error)
}

// BaseEvent 基础事件实现
type BaseEvent struct {
    EventID     string      `json:"event_id"`
    EventType   string      `json:"event_type"`
    AggregateID string      `json:"aggregate_id"`
    EventData   interface{} `json:"event_data"`
    Timestamp   time.Time   `json:"timestamp"`
    Version     int         `json:"version"`
}

func (e *BaseEvent) GetEventID() string {
    return e.EventID
}

func (e *BaseEvent) GetEventType() string {
    return e.EventType
}

func (e *BaseEvent) GetAggregateID() string {
    return e.AggregateID
}

func (e *BaseEvent) GetEventData() interface{} {
    return e.EventData
}

func (e *BaseEvent) GetTimestamp() time.Time {
    return e.Timestamp
}

func (e *BaseEvent) GetVersion() int {
    return e.Version
}

// InMemoryEventStore 内存事件存储实现
type InMemoryEventStore struct {
    events map[string][]Event
    mutex  sync.RWMutex
}

func NewInMemoryEventStore() *InMemoryEventStore {
    return &InMemoryEventStore{
        events: make(map[string][]Event),
    }
}

func (es *InMemoryEventStore) SaveEvents(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error {
    es.mutex.Lock()
    defer es.mutex.Unlock()
    
    existingEvents, exists := es.events[aggregateID]
    currentVersion := 0
    if exists {
        currentVersion = len(existingEvents)
    }
    
    if expectedVersion != -1 && currentVersion != expectedVersion {
        return fmt.Errorf("concurrency conflict: expected version %d, but current version is %d", expectedVersion, currentVersion)
    }
    
    // 设置事件版本
    for i, event := range events {
        if baseEvent, ok := event.(*BaseEvent); ok {
            baseEvent.Version = currentVersion + i + 1
        }
    }
    
    if !exists {
        es.events[aggregateID] = make([]Event, 0)
    }
    
    es.events[aggregateID] = append(es.events[aggregateID], events...)
    
    log.Printf("Saved %d events for aggregate %s", len(events), aggregateID)
    return nil
}

func (es *InMemoryEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
    es.mutex.RLock()
    defer es.mutex.RUnlock()
    
    events, exists := es.events[aggregateID]
    if !exists {
        return []Event{}, nil
    }
    
    // 返回副本
    result := make([]Event, len(events))
    copy(result, events)
    
    return result, nil
}

func (es *InMemoryEventStore) GetEventsFromVersion(ctx context.Context, aggregateID string, fromVersion int) ([]Event, error) {
    allEvents, err := es.GetEvents(ctx, aggregateID)
    if err != nil {
        return nil, err
    }
    
    var result []Event
    for _, event := range allEvents {
        if event.GetVersion() >= fromVersion {
            result = append(result, event)
        }
    }
    
    return result, nil
}

func (es *InMemoryEventStore) GetAllEvents(ctx context.Context, fromTimestamp time.Time) ([]Event, error) {
    es.mutex.RLock()
    defer es.mutex.RUnlock()
    
    var allEvents []Event
    for _, events := range es.events {
        for _, event := range events {
            if event.GetTimestamp().After(fromTimestamp) || event.GetTimestamp().Equal(fromTimestamp) {
                allEvents = append(allEvents, event)
            }
        }
    }
    
    // 按时间戳排序
    sort.Slice(allEvents, func(i, j int) bool {
        return allEvents[i].GetTimestamp().Before(allEvents[j].GetTimestamp())
    })
    
    return allEvents, nil
}

// BaseAggregate 基础聚合实现
type BaseAggregate struct {
    ID                string
    Version           int
    UncommittedEvents []Event
}

func (a *BaseAggregate) GetID() string {
    return a.ID
}

func (a *BaseAggregate) GetVersion() int {
    return a.Version
}

func (a *BaseAggregate) GetUncommittedEvents() []Event {
    return a.UncommittedEvents
}

func (a *BaseAggregate) MarkEventsAsCommitted() {
    a.Version += len(a.UncommittedEvents)
    a.UncommittedEvents = []Event{}
}

func (a *BaseAggregate) ApplyEvent(event Event) {
    a.UncommittedEvents = append(a.UncommittedEvents, event)
}

// EventSourcingRepository 事件溯源仓储实现
type EventSourcingRepository struct {
    eventStore    EventStore
    aggregateType func() Aggregate
}

func NewEventSourcingRepository(eventStore EventStore, aggregateType func() Aggregate) *EventSourcingRepository {
    return &EventSourcingRepository{
        eventStore:    eventStore,
        aggregateType: aggregateType,
    }
}

func (r *EventSourcingRepository) Save(ctx context.Context, aggregate Aggregate, expectedVersion int) error {
    uncommittedEvents := aggregate.GetUncommittedEvents()
    if len(uncommittedEvents) == 0 {
        return nil
    }
    
    err := r.eventStore.SaveEvents(ctx, aggregate.GetID(), uncommittedEvents, expectedVersion)
    if err != nil {
        return fmt.Errorf("failed to save events: %w", err)
    }
    
    aggregate.MarkEventsAsCommitted()
    return nil
}

func (r *EventSourcingRepository) GetByID(ctx context.Context, id string) (Aggregate, error) {
    events, err := r.eventStore.GetEvents(ctx, id)
    if err != nil {
        return nil, fmt.Errorf("failed to get events: %w", err)
    }
    
    if len(events) == 0 {
        return nil, fmt.Errorf("aggregate not found: %s", id)
    }
    
    aggregate := r.aggregateType()
    aggregate.LoadFromHistory(events)
    
    return aggregate, nil
}

// 用户聚合示例

// UserAggregate 用户聚合
type UserAggregate struct {
    BaseAggregate
    Username string
    Email    string
    Status   string
}

func NewUserAggregate(id string) *UserAggregate {
    return &UserAggregate{
        BaseAggregate: BaseAggregate{
            ID: id,
        },
    }
}

func (u *UserAggregate) LoadFromHistory(events []Event) {
    for _, event := range events {
        u.applyEvent(event)
        u.Version++
    }
}

func (u *UserAggregate) CreateUser(username, email string) error {
    if u.Status != "" {
        return fmt.Errorf("user already exists")
    }
    
    event := &BaseEvent{
        EventID:     uuid.New().String(),
        EventType:   "UserCreated",
        AggregateID: u.ID,
        EventData: map[string]interface{}{
            "username": username,
            "email":    email,
        },
        Timestamp: time.Now(),
    }
    
    u.applyEvent(event)
    u.ApplyEvent(event)
    
    return nil
}

func (u *UserAggregate) UpdateEmail(newEmail string) error {
    if u.Status != "active" {
        return fmt.Errorf("user is not active")
    }
    
    if u.Email == newEmail {
        return nil // 没有变化
    }
    
    event := &BaseEvent{
        EventID:     uuid.New().String(),
        EventType:   "UserEmailUpdated",
        AggregateID: u.ID,
        EventData: map[string]interface{}{
            "old_email": u.Email,
            "new_email": newEmail,
        },
        Timestamp: time.Now(),
    }
    
    u.applyEvent(event)
    u.ApplyEvent(event)
    
    return nil
}

func (u *UserAggregate) DeactivateUser() error {
    if u.Status != "active" {
        return fmt.Errorf("user is not active")
    }
    
    event := &BaseEvent{
        EventID:     uuid.New().String(),
        EventType:   "UserDeactivated",
        AggregateID: u.ID,
        EventData:   map[string]interface{}{},
        Timestamp:   time.Now(),
    }
    
    u.applyEvent(event)
    u.ApplyEvent(event)
    
    return nil
}

func (u *UserAggregate) applyEvent(event Event) {
    switch event.GetEventType() {
    case "UserCreated":
        data := event.GetEventData().(map[string]interface{})
        u.Username = data["username"].(string)
        u.Email = data["email"].(string)
        u.Status = "active"
        
    case "UserEmailUpdated":
        data := event.GetEventData().(map[string]interface{})
        u.Email = data["new_email"].(string)
        
    case "UserDeactivated":
        u.Status = "inactive"
    }
}

// 事件投影器
type EventProjector struct {
    eventStore EventStore
    projections map[string]func(Event) error
    lastProcessedTimestamp time.Time
    mutex sync.RWMutex
}

func NewEventProjector(eventStore EventStore) *EventProjector {
    return &EventProjector{
        eventStore:  eventStore,
        projections: make(map[string]func(Event) error),
    }
}

func (ep *EventProjector) RegisterProjection(eventType string, handler func(Event) error) {
    ep.mutex.Lock()
    defer ep.mutex.Unlock()
    ep.projections[eventType] = handler
}

func (ep *EventProjector) ProcessEvents(ctx context.Context) error {
    ep.mutex.RLock()
    fromTimestamp := ep.lastProcessedTimestamp
    ep.mutex.RUnlock()
    
    events, err := ep.eventStore.GetAllEvents(ctx, fromTimestamp)
    if err != nil {
        return fmt.Errorf("failed to get events: %w", err)
    }
    
    for _, event := range events {
        ep.mutex.RLock()
        handler, exists := ep.projections[event.GetEventType()]
        ep.mutex.RUnlock()
        
        if exists {
            err := handler(event)
            if err != nil {
                log.Printf("Error processing event %s: %v", event.GetEventID(), err)
                continue
            }
        }
        
        ep.mutex.Lock()
        if event.GetTimestamp().After(ep.lastProcessedTimestamp) {
            ep.lastProcessedTimestamp = event.GetTimestamp()
        }
        ep.mutex.Unlock()
    }
    
    return nil
}

// 使用示例
func ExampleEventSourcing() {
    // 创建事件存储
    eventStore := NewInMemoryEventStore()
    
    // 创建仓储
    userRepo := NewEventSourcingRepository(eventStore, func() Aggregate {
        return &UserAggregate{}
    })
    
    ctx := context.Background()
    
    // 创建用户聚合
    userID := uuid.New().String()
    user := NewUserAggregate(userID)
    
    // 执行业务操作
    err := user.CreateUser("john_doe", "john@example.com")
    if err != nil {
        log.Printf("Error creating user: %v", err)
        return
    }
    
    // 保存聚合
    err = userRepo.Save(ctx, user, -1)
    if err != nil {
        log.Printf("Error saving user: %v", err)
        return
    }
    
    // 重新加载聚合
    loadedUser, err := userRepo.GetByID(ctx, userID)
    if err != nil {
        log.Printf("Error loading user: %v", err)
        return
    }
    
    userAggregate := loadedUser.(*UserAggregate)
    log.Printf("Loaded user: %s, %s, %s", userAggregate.Username, userAggregate.Email, userAggregate.Status)
    
    // 更新邮箱
    err = userAggregate.UpdateEmail("john.doe@example.com")
    if err != nil {
        log.Printf("Error updating email: %v", err)
        return
    }
    
    // 保存更新
    err = userRepo.Save(ctx, userAggregate, userAggregate.GetVersion())
    if err != nil {
        log.Printf("Error saving updated user: %v", err)
        return
    }
    
    log.Printf("User updated successfully")
}

数据一致性保证

最终一致性

在分布式系统中,最终一致性是一种常见的一致性模型。

// 最终一致性实现
package consistency

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

// ConsistencyLevel 一致性级别
type ConsistencyLevel int

const (
    ConsistencyLevelEventual ConsistencyLevel = iota
    ConsistencyLevelStrong
    ConsistencyLevelWeak
)

// DataSynchronizer 数据同步器
type DataSynchronizer struct {
    replicas map[string]DataReplica
    mutex    sync.RWMutex
    config   SyncConfig
}

// DataReplica 数据副本接口
type DataReplica interface {
    Write(ctx context.Context, key string, value interface{}) error
    Read(ctx context.Context, key string) (interface{}, error)
    GetLastModified(ctx context.Context, key string) (time.Time, error)
    Sync(ctx context.Context, changes []DataChange) error
}

// DataChange 数据变更
type DataChange struct {
    Key       string
    Value     interface{}
    Timestamp time.Time
    Version   int64
}

// SyncConfig 同步配置
type SyncConfig struct {
    SyncInterval    time.Duration
    RetryAttempts   int
    RetryDelay      time.Duration
    ConflictResolver func(local, remote DataChange) DataChange
}

func NewDataSynchronizer(config SyncConfig) *DataSynchronizer {
    return &DataSynchronizer{
        replicas: make(map[string]DataReplica),
        config:   config,
    }
}

func (ds *DataSynchronizer) RegisterReplica(name string, replica DataReplica) {
    ds.mutex.Lock()
    defer ds.mutex.Unlock()
    ds.replicas[name] = replica
}

func (ds *DataSynchronizer) WriteWithEventualConsistency(ctx context.Context, key string, value interface{}) error {
    ds.mutex.RLock()
    replicas := make([]DataReplica, 0, len(ds.replicas))
    for _, replica := range ds.replicas {
        replicas = append(replicas, replica)
    }
    ds.mutex.RUnlock()
    
    // 异步写入所有副本
    var wg sync.WaitGroup
    errors := make(chan error, len(replicas))
    
    for _, replica := range replicas {
        wg.Add(1)
        go func(r DataReplica) {
            defer wg.Done()
            
            for attempt := 0; attempt < ds.config.RetryAttempts; attempt++ {
                err := r.Write(ctx, key, value)
                if err == nil {
                    return
                }
                
                log.Printf("Write attempt %d failed: %v", attempt+1, err)
                if attempt < ds.config.RetryAttempts-1 {
                    time.Sleep(ds.config.RetryDelay)
                } else {
                    errors <- err
                }
            }
        }(replica)
    }
    
    wg.Wait()
    close(errors)
    
    // 检查是否有错误
    var lastError error
    errorCount := 0
    for err := range errors {
        lastError = err
        errorCount++
    }
    
    // 如果大部分副本写入成功,认为操作成功
    if errorCount < len(replicas)/2 {
        return nil
    }
    
    return fmt.Errorf("write failed on %d replicas, last error: %w", errorCount, lastError)
}

func (ds *DataSynchronizer) ReadWithEventualConsistency(ctx context.Context, key string) (interface{}, error) {
    ds.mutex.RLock()
    replicas := make([]DataReplica, 0, len(ds.replicas))
    for _, replica := range ds.replicas {
        replicas = append(replicas, replica)
    }
    ds.mutex.RUnlock()
    
    // 从第一个可用的副本读取
    for _, replica := range replicas {
        value, err := replica.Read(ctx, key)
        if err == nil {
            return value, nil
        }
        log.Printf("Read from replica failed: %v", err)
    }
    
    return nil, fmt.Errorf("failed to read from all replicas")
}

func (ds *DataSynchronizer) StartSynchronization(ctx context.Context) {
    ticker := time.NewTicker(ds.config.SyncInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            err := ds.synchronizeReplicas(ctx)
            if err != nil {
                log.Printf("Synchronization failed: %v", err)
            }
        }
    }
}

func (ds *DataSynchronizer) synchronizeReplicas(ctx context.Context) error {
    ds.mutex.RLock()
    replicas := make(map[string]DataReplica)
    for name, replica := range ds.replicas {
        replicas[name] = replica
    }
    ds.mutex.RUnlock()
    
    // 收集所有副本的变更
    allChanges := make(map[string]map[string]DataChange)
    
    for name, replica := range replicas {
        // 这里简化实现,实际应该获取自上次同步以来的变更
        allChanges[name] = make(map[string]DataChange)
    }
    
    // 解决冲突并同步
    for key, replicaChanges := range ds.collectChanges(allChanges) {
        resolvedChange := ds.resolveConflicts(replicaChanges)
        
        // 将解决后的变更应用到所有副本
        for name, replica := range replicas {
            err := replica.Sync(ctx, []DataChange{resolvedChange})
            if err != nil {
                log.Printf("Failed to sync change to replica %s: %v", name, err)
            }
        }
    }
    
    return nil
}

func (ds *DataSynchronizer) collectChanges(allChanges map[string]map[string]DataChange) map[string][]DataChange {
    changesByKey := make(map[string][]DataChange)
    
    for _, replicaChanges := range allChanges {
        for key, change := range replicaChanges {
            changesByKey[key] = append(changesByKey[key], change)
        }
    }
    
    return changesByKey
}

func (ds *DataSynchronizer) resolveConflicts(changes []DataChange) DataChange {
    if len(changes) == 0 {
        return DataChange{}
    }
    
    if len(changes) == 1 {
        return changes[0]
    }
    
    // 使用配置的冲突解决器
    if ds.config.ConflictResolver != nil {
        resolved := changes[0]
        for i := 1; i < len(changes); i++ {
            resolved = ds.config.ConflictResolver(resolved, changes[i])
        }
        return resolved
    }
    
    // 默认使用最新时间戳的变更
    latest := changes[0]
    for _, change := range changes[1:] {
        if change.Timestamp.After(latest.Timestamp) {
            latest = change
        }
    }
    
    return latest
}

// 内存副本实现
type InMemoryReplica struct {
    data         map[string]interface{}
    lastModified map[string]time.Time
    mutex        sync.RWMutex
    name         string
}

func NewInMemoryReplica(name string) *InMemoryReplica {
    return &InMemoryReplica{
        data:         make(map[string]interface{}),
        lastModified: make(map[string]time.Time),
        name:         name,
    }
}

func (r *InMemoryReplica) Write(ctx context.Context, key string, value interface{}) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()
    
    r.data[key] = value
    r.lastModified[key] = time.Now()
    
    log.Printf("[%s] Written key %s", r.name, key)
    return nil
}

func (r *InMemoryReplica) Read(ctx context.Context, key string) (interface{}, error) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()
    
    value, exists := r.data[key]
    if !exists {
        return nil, fmt.Errorf("key not found: %s", key)
    }
    
    log.Printf("[%s] Read key %s", r.name, key)
    return value, nil
}

func (r *InMemoryReplica) GetLastModified(ctx context.Context, key string) (time.Time, error) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()
    
    timestamp, exists := r.lastModified[key]
    if !exists {
        return time.Time{}, fmt.Errorf("key not found: %s", key)
    }
    
    return timestamp, nil
}

func (r *InMemoryReplica) Sync(ctx context.Context, changes []DataChange) error {
    r.mutex.Lock()
    defer r.mutex.Unlock()
    
    for _, change := range changes {
        r.data[change.Key] = change.Value
        r.lastModified[change.Key] = change.Timestamp
        log.Printf("[%s] Synced key %s", r.name, change.Key)
    }
    
    return nil
}

使用示例

// 最终一致性使用示例
func ExampleEventualConsistency() {
    // 创建同步配置
    config := SyncConfig{
        SyncInterval:  time.Second * 30,
        RetryAttempts: 3,
        RetryDelay:    time.Millisecond * 100,
        ConflictResolver: func(local, remote DataChange) DataChange {
            // 使用最新时间戳的变更
            if remote.Timestamp.After(local.Timestamp) {
                return remote
            }
            return local
        },
    }
    
    // 创建数据同步器
    synchronizer := NewDataSynchronizer(config)
    
    // 注册副本
    replica1 := NewInMemoryReplica("replica-1")
    replica2 := NewInMemoryReplica("replica-2")
    replica3 := NewInMemoryReplica("replica-3")
    
    synchronizer.RegisterReplica("replica-1", replica1)
    synchronizer.RegisterReplica("replica-2", replica2)
    synchronizer.RegisterReplica("replica-3", replica3)
    
    ctx := context.Background()
    
    // 启动同步
    go synchronizer.StartSynchronization(ctx)
    
    // 写入数据
    err := synchronizer.WriteWithEventualConsistency(ctx, "user:123", map[string]interface{}{
        "name":  "John Doe",
        "email": "john@example.com",
    })
    if err != nil {
        log.Printf("Write failed: %v", err)
        return
    }
    
    // 等待一段时间让数据同步
    time.Sleep(time.Second * 2)
    
    // 读取数据
    value, err := synchronizer.ReadWithEventualConsistency(ctx, "user:123")
    if err != nil {
        log.Printf("Read failed: %v", err)
        return
    }
    
    log.Printf("Read value: %+v", value)
}

本章总结

本章深入探讨了微服务架构中的数据管理与一致性问题,涵盖了以下关键内容:

数据库模式

  1. 每服务一个数据库模式

    • 提供了完全的数据隔离和技术栈自由
    • 避免了服务间的数据耦合
    • 需要处理分布式事务和数据一致性问题
  2. 共享数据库模式

    • 简化了事务处理和数据一致性
    • 可能导致服务间的紧耦合
    • 适用于过渡期或特定场景

分布式事务处理

  1. Saga模式

    • 通过补偿操作处理长事务
    • 支持编排和协调两种实现方式
    • 提供了良好的可扩展性和容错性
  2. 两阶段提交(2PC)

    • 保证强一致性
    • 存在阻塞和单点故障风险
    • 适用于对一致性要求极高的场景

架构模式

  1. CQRS模式

    • 分离命令和查询职责
    • 优化读写性能
    • 支持复杂的业务逻辑和报表需求
  2. 事件溯源模式

    • 通过事件序列重建状态
    • 提供完整的审计跟踪
    • 支持时间旅行和状态回溯

数据一致性保证

  1. 最终一致性
    • 在分布式环境中平衡性能和一致性
    • 通过异步同步和冲突解决机制
    • 适用于大多数业务场景

最佳实践

  1. 选择合适的一致性模型

    • 根据业务需求选择强一致性或最终一致性
    • 考虑性能、可用性和复杂性的权衡
  2. 设计幂等操作

    • 确保重试操作的安全性
    • 使用唯一标识符防止重复处理
  3. 实现补偿机制

    • 为每个操作设计相应的补偿操作
    • 确保系统能够从失败中恢复
  4. 监控和可观测性

    • 监控事务状态和数据一致性
    • 实现分布式追踪和日志聚合

下一章我们将探讨微服务的安全与认证,包括身份认证、授权机制、API安全和数据保护等内容。

共享数据库模式

在某些情况下,多个服务可能需要共享数据库,但需要严格的访问控制。

# 共享数据库配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: shared-db-config
  namespace: microservices
data:
  database.yml: |
    shared_database:
      host: shared-postgres
      port: 5432
      database: shared_data
      
    services:
      user_service:
        username: user_service
        schema: user_service
        permissions:
          - SELECT
          - INSERT
          - UPDATE
          - DELETE
        tables:
          - users
          - user_profiles
          
      order_service:
        username: order_service
        schema: order_service
        permissions:
          - SELECT
          - INSERT
          - UPDATE
          - DELETE
        tables:
          - orders
          - order_items
          
      analytics_service:
        username: analytics_service
        schema: analytics
        permissions:
          - SELECT
        tables:
          - user_service.users
          - order_service.orders
          - order_service.order_items
-- 共享数据库权限管理脚本
-- shared-db-permissions.sql

-- 创建服务专用用户
CREATE USER user_service WITH PASSWORD 'user_service_password';
CREATE USER order_service WITH PASSWORD 'order_service_password';
CREATE USER analytics_service WITH PASSWORD 'analytics_service_password';

-- 创建服务专用模式
CREATE SCHEMA IF NOT EXISTS user_service AUTHORIZATION user_service;
CREATE SCHEMA IF NOT EXISTS order_service AUTHORIZATION order_service;
CREATE SCHEMA IF NOT EXISTS analytics AUTHORIZATION analytics_service;

-- 设置默认权限
ALTER DEFAULT PRIVILEGES IN SCHEMA user_service
    GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO user_service;
    
ALTER DEFAULT PRIVILEGES IN SCHEMA order_service
    GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO order_service;
    
ALTER DEFAULT PRIVILEGES IN SCHEMA analytics
    GRANT SELECT ON TABLES TO analytics_service;

-- 为分析服务授予只读权限
GRANT USAGE ON SCHEMA user_service TO analytics_service;
GRANT USAGE ON SCHEMA order_service TO analytics_service;
GRANT SELECT ON ALL TABLES IN SCHEMA user_service TO analytics_service;
GRANT SELECT ON ALL TABLES IN SCHEMA order_service TO analytics_service;

-- 创建行级安全策略
ALTER TABLE user_service.users ENABLE ROW LEVEL SECURITY;

CREATE POLICY user_service_policy ON user_service.users
    FOR ALL TO user_service
    USING (true);
    
CREATE POLICY analytics_read_policy ON user_service.users
    FOR SELECT TO analytics_service
    USING (status = 'active');

分布式事务处理

Saga模式

Saga模式通过一系列本地事务来实现分布式事务。

// Saga模式实现
package saga

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

// SagaStep Saga步骤
type SagaStep struct {
    Name        string
    Execute     func(ctx context.Context, data interface{}) (interface{}, error)
    Compensate  func(ctx context.Context, data interface{}) error
    RetryPolicy *RetryPolicy
}

// RetryPolicy 重试策略
type RetryPolicy struct {
    MaxAttempts int
    Delay       time.Duration
    Multiplier  float64
}

// SagaExecution Saga执行状态
type SagaExecution struct {
    ID           string
    Steps        []*SagaStep
    CurrentStep  int
    Data         interface{}
    Status       SagaStatus
    StartTime    time.Time
    EndTime      *time.Time
    Error        error
    CompletedSteps []int
    mutex        sync.RWMutex
}

// SagaStatus Saga状态
type SagaStatus int

const (
    SagaStatusPending SagaStatus = iota
    SagaStatusRunning
    SagaStatusCompleted
    SagaStatusFailed
    SagaStatusCompensating
    SagaStatusCompensated
)

func (s SagaStatus) String() string {
    switch s {
    case SagaStatusPending:
        return "PENDING"
    case SagaStatusRunning:
        return "RUNNING"
    case SagaStatusCompleted:
        return "COMPLETED"
    case SagaStatusFailed:
        return "FAILED"
    case SagaStatusCompensating:
        return "COMPENSATING"
    case SagaStatusCompensated:
        return "COMPENSATED"
    default:
        return "UNKNOWN"
    }
}

// SagaOrchestrator Saga编排器
type SagaOrchestrator struct {
    executions map[string]*SagaExecution
    mutex      sync.RWMutex
}

func NewSagaOrchestrator() *SagaOrchestrator {
    return &SagaOrchestrator{
        executions: make(map[string]*SagaExecution),
    }
}

func (so *SagaOrchestrator) Execute(ctx context.Context, sagaID string, steps []*SagaStep, data interface{}) error {
    execution := &SagaExecution{
        ID:        sagaID,
        Steps:     steps,
        Data:      data,
        Status:    SagaStatusPending,
        StartTime: time.Now(),
        CompletedSteps: make([]int, 0),
    }
    
    so.mutex.Lock()
    so.executions[sagaID] = execution
    so.mutex.Unlock()
    
    return so.executeSteps(ctx, execution)
}

func (so *SagaOrchestrator) executeSteps(ctx context.Context, execution *SagaExecution) error {
    execution.mutex.Lock()
    execution.Status = SagaStatusRunning
    execution.mutex.Unlock()
    
    for i, step := range execution.Steps {
        execution.mutex.Lock()
        execution.CurrentStep = i
        execution.mutex.Unlock()
        
        log.Printf("Executing saga step %d: %s", i, step.Name)
        
        result, err := so.executeStepWithRetry(ctx, step, execution.Data)
        if err != nil {
            log.Printf("Saga step %d failed: %v", i, err)
            execution.mutex.Lock()
            execution.Status = SagaStatusFailed
            execution.Error = err
            execution.mutex.Unlock()
            
            // 执行补偿
            return so.compensate(ctx, execution)
        }
        
        execution.mutex.Lock()
        execution.Data = result
        execution.CompletedSteps = append(execution.CompletedSteps, i)
        execution.mutex.Unlock()
        
        log.Printf("Saga step %d completed: %s", i, step.Name)
    }
    
    execution.mutex.Lock()
    execution.Status = SagaStatusCompleted
    now := time.Now()
    execution.EndTime = &now
    execution.mutex.Unlock()
    
    log.Printf("Saga %s completed successfully", execution.ID)
    return nil
}

func (so *SagaOrchestrator) executeStepWithRetry(ctx context.Context, step *SagaStep, data interface{}) (interface{}, error) {
    var lastErr error
    maxAttempts := 1
    delay := time.Second
    multiplier := 2.0
    
    if step.RetryPolicy != nil {
        maxAttempts = step.RetryPolicy.MaxAttempts
        delay = step.RetryPolicy.Delay
        multiplier = step.RetryPolicy.Multiplier
    }
    
    for attempt := 1; attempt <= maxAttempts; attempt++ {
        result, err := step.Execute(ctx, data)
        if err == nil {
            return result, nil
        }
        
        lastErr = err
        
        if attempt < maxAttempts {
            log.Printf("Step %s attempt %d failed: %v, retrying in %v", step.Name, attempt, err, delay)
            time.Sleep(delay)
            delay = time.Duration(float64(delay) * multiplier)
        }
    }
    
    return nil, fmt.Errorf("step %s failed after %d attempts: %w", step.Name, maxAttempts, lastErr)
}

func (so *SagaOrchestrator) compensate(ctx context.Context, execution *SagaExecution) error {
    execution.mutex.Lock()
    execution.Status = SagaStatusCompensating
    execution.mutex.Unlock()
    
    log.Printf("Starting compensation for saga %s", execution.ID)
    
    // 逆序执行补偿
    for i := len(execution.CompletedSteps) - 1; i >= 0; i-- {
        stepIndex := execution.CompletedSteps[i]
        step := execution.Steps[stepIndex]
        
        if step.Compensate != nil {
            log.Printf("Compensating step %d: %s", stepIndex, step.Name)
            
            err := step.Compensate(ctx, execution.Data)
            if err != nil {
                log.Printf("Compensation failed for step %d: %v", stepIndex, err)
                // 补偿失败,需要人工干预
                return fmt.Errorf("compensation failed for step %s: %w", step.Name, err)
            }
            
            log.Printf("Compensation completed for step %d: %s", stepIndex, step.Name)
        }
    }
    
    execution.mutex.Lock()
    execution.Status = SagaStatusCompensated
    now := time.Now()
    execution.EndTime = &now
    execution.mutex.Unlock()
    
    log.Printf("Compensation completed for saga %s", execution.ID)
    return execution.Error
}

func (so *SagaOrchestrator) GetExecution(sagaID string) (*SagaExecution, bool) {
    so.mutex.RLock()
    defer so.mutex.RUnlock()
    
    execution, exists := so.executions[sagaID]
    return execution, exists
}

func (so *SagaOrchestrator) GetStatus(sagaID string) (SagaStatus, bool) {
    execution, exists := so.GetExecution(sagaID)
    if !exists {
        return SagaStatusPending, false
    }
    
    execution.mutex.RLock()
    defer execution.mutex.RUnlock()
    
    return execution.Status, true
}

// 订单处理Saga示例
type OrderData struct {
    OrderID     string
    UserID      string
    ProductID   string
    Quantity    int
    Amount      float64
    PaymentID   string
    InventoryID string
}

func CreateOrderSaga() []*SagaStep {
    return []*SagaStep{
        {
            Name: "ValidateOrder",
            Execute: func(ctx context.Context, data interface{}) (interface{}, error) {
                orderData := data.(*OrderData)
                log.Printf("Validating order %s", orderData.OrderID)
                
                // 模拟订单验证
                if orderData.Quantity <= 0 {
                    return nil, fmt.Errorf("invalid quantity: %d", orderData.Quantity)
                }
                
                return orderData, nil
            },
            Compensate: func(ctx context.Context, data interface{}) error {
                // 验证步骤通常不需要补偿
                return nil
            },
        },
        {
            Name: "ReserveInventory",
            Execute: func(ctx context.Context, data interface{}) (interface{}, error) {
                orderData := data.(*OrderData)
                log.Printf("Reserving inventory for order %s", orderData.OrderID)
                
                // 模拟库存预留
                orderData.InventoryID = fmt.Sprintf("inv_%s", orderData.OrderID)
                
                return orderData, nil
            },
            Compensate: func(ctx context.Context, data interface{}) error {
                orderData := data.(*OrderData)
                log.Printf("Releasing inventory reservation %s", orderData.InventoryID)
                
                // 模拟释放库存预留
                return nil
            },
            RetryPolicy: &RetryPolicy{
                MaxAttempts: 3,
                Delay:       time.Second,
                Multiplier:  2.0,
            },
        },
        {
            Name: "ProcessPayment",
            Execute: func(ctx context.Context, data interface{}) (interface{}, error) {
                orderData := data.(*OrderData)
                log.Printf("Processing payment for order %s", orderData.OrderID)
                
                // 模拟支付处理
                orderData.PaymentID = fmt.Sprintf("pay_%s", orderData.OrderID)
                
                return orderData, nil
            },
            Compensate: func(ctx context.Context, data interface{}) error {
                orderData := data.(*OrderData)
                log.Printf("Refunding payment %s", orderData.PaymentID)
                
                // 模拟退款
                return nil
            },
            RetryPolicy: &RetryPolicy{
                MaxAttempts: 3,
                Delay:       time.Second,
                Multiplier:  2.0,
            },
        },
        {
            Name: "CreateOrder",
            Execute: func(ctx context.Context, data interface{}) (interface{}, error) {
                orderData := data.(*OrderData)
                log.Printf("Creating order %s", orderData.OrderID)
                
                // 模拟订单创建
                return orderData, nil
            },
            Compensate: func(ctx context.Context, data interface{}) error {
                orderData := data.(*OrderData)
                log.Printf("Canceling order %s", orderData.OrderID)
                
                // 模拟订单取消
                return nil
            },
        },
    }
}

// 使用示例
func main() {
    orchestrator := NewSagaOrchestrator()
    
    orderData := &OrderData{
        OrderID:   "order_123",
        UserID:    "user_456",
        ProductID: "product_789",
        Quantity:  2,
        Amount:    99.99,
    }
    
    steps := CreateOrderSaga()
    
    ctx := context.Background()
    err := orchestrator.Execute(ctx, "saga_order_123", steps, orderData)
    if err != nil {
        log.Printf("Saga execution failed: %v", err)
    }
    
    // 检查执行状态
    status, exists := orchestrator.GetStatus("saga_order_123")
    if exists {
        log.Printf("Saga status: %s", status)
    }
}