概述
在微服务架构中,数据管理是最具挑战性的问题之一。与单体应用不同,微服务需要处理分布式数据存储、数据一致性、事务管理等复杂问题。本章将深入探讨微服务架构中的数据管理策略和一致性保证机制。
数据库设计模式
每服务一个数据库模式
每个微服务拥有自己的数据库,确保服务间的数据隔离。
# 用户服务数据库配置
apiVersion: v1
kind: ConfigMap
metadata:
name: user-db-config
namespace: microservices
data:
database.yml: |
production:
adapter: postgresql
host: user-postgres
port: 5432
database: user_service
username: user_service
password: ${DB_PASSWORD}
pool: 20
timeout: 5000
encoding: unicode
schema_search_path: "user_service,shared,public"
migrations:
enabled: true
path: /app/db/migrations
connection_pool:
initial_size: 5
max_size: 20
checkout_timeout: 5000
idle_timeout: 300000
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: user-postgres
namespace: microservices
spec:
serviceName: user-postgres
replicas: 1
selector:
matchLabels:
app: user-postgres
template:
metadata:
labels:
app: user-postgres
spec:
containers:
- name: postgres
image: postgres:15
env:
- name: POSTGRES_DB
value: user_service
- name: POSTGRES_USER
value: user_service
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: user-db-secret
key: password
- name: PGDATA
value: /var/lib/postgresql/data/pgdata
ports:
- containerPort: 5432
name: postgres
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
- name: init-scripts
mountPath: /docker-entrypoint-initdb.d
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
exec:
command:
- pg_isready
- -U
- user_service
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
exec:
command:
- pg_isready
- -U
- user_service
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: init-scripts
configMap:
name: user-db-init
volumeClaimTemplates:
- metadata:
name: postgres-storage
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
---
apiVersion: v1
kind: Service
metadata:
name: user-postgres
namespace: microservices
spec:
selector:
app: user-postgres
ports:
- port: 5432
targetPort: 5432
clusterIP: None
-- 用户服务数据库初始化脚本
-- init-user-db.sql
-- 创建用户表
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(50),
last_name VARCHAR(50),
phone VARCHAR(20),
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
version INTEGER DEFAULT 1
);
-- 创建用户配置表
CREATE TABLE IF NOT EXISTS user_profiles (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
avatar_url VARCHAR(255),
bio TEXT,
location VARCHAR(100),
website VARCHAR(255),
birth_date DATE,
gender VARCHAR(10),
preferences JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- 创建用户会话表
CREATE TABLE IF NOT EXISTS user_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
session_token VARCHAR(255) UNIQUE NOT NULL,
refresh_token VARCHAR(255),
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
last_accessed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
ip_address INET,
user_agent TEXT
);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
CREATE INDEX IF NOT EXISTS idx_users_status ON users(status);
CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);
CREATE INDEX IF NOT EXISTS idx_user_profiles_user_id ON user_profiles(user_id);
CREATE INDEX IF NOT EXISTS idx_user_sessions_user_id ON user_sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_user_sessions_token ON user_sessions(session_token);
CREATE INDEX IF NOT EXISTS idx_user_sessions_expires_at ON user_sessions(expires_at);
-- 创建更新时间触发器
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_user_profiles_updated_at BEFORE UPDATE ON user_profiles
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
-- 创建版本控制触发器
CREATE OR REPLACE FUNCTION increment_version()
RETURNS TRIGGER AS $$
BEGIN
NEW.version = OLD.version + 1;
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER increment_users_version BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION increment_version();
-- 插入示例数据
INSERT INTO users (username, email, password_hash, first_name, last_name) VALUES
('admin', 'admin@example.com', '$2a$10$N9qo8uLOickgx2ZMRZoMye', 'Admin', 'User'),
('john_doe', 'john@example.com', '$2a$10$N9qo8uLOickgx2ZMRZoMye', 'John', 'Doe'),
('jane_smith', 'jane@example.com', '$2a$10$N9qo8uLOickgx2ZMRZoMye', 'Jane', 'Smith')
ON CONFLICT (username) DO NOTHING;
// Go语言数据访问层实现
package repository
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
// User 用户实体
type User struct {
ID uuid.UUID `json:"id" db:"id"`
Username string `json:"username" db:"username"`
Email string `json:"email" db:"email"`
PasswordHash string `json:"-" db:"password_hash"`
FirstName *string `json:"first_name" db:"first_name"`
LastName *string `json:"last_name" db:"last_name"`
Phone *string `json:"phone" db:"phone"`
Status string `json:"status" db:"status"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
Version int `json:"version" db:"version"`
}
// UserProfile 用户配置
type UserProfile struct {
ID uuid.UUID `json:"id" db:"id"`
UserID uuid.UUID `json:"user_id" db:"user_id"`
AvatarURL *string `json:"avatar_url" db:"avatar_url"`
Bio *string `json:"bio" db:"bio"`
Location *string `json:"location" db:"location"`
Website *string `json:"website" db:"website"`
BirthDate *time.Time `json:"birth_date" db:"birth_date"`
Gender *string `json:"gender" db:"gender"`
Preferences map[string]interface{} `json:"preferences" db:"preferences"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
}
// UserRepository 用户仓储接口
type UserRepository interface {
Create(ctx context.Context, user *User) error
GetByID(ctx context.Context, id uuid.UUID) (*User, error)
GetByEmail(ctx context.Context, email string) (*User, error)
GetByUsername(ctx context.Context, username string) (*User, error)
Update(ctx context.Context, user *User) error
Delete(ctx context.Context, id uuid.UUID) error
List(ctx context.Context, limit, offset int) ([]*User, error)
Count(ctx context.Context) (int64, error)
}
// PostgresUserRepository PostgreSQL用户仓储实现
type PostgresUserRepository struct {
db *sqlx.DB
}
func NewPostgresUserRepository(db *sqlx.DB) UserRepository {
return &PostgresUserRepository{db: db}
}
func (r *PostgresUserRepository) Create(ctx context.Context, user *User) error {
query := `
INSERT INTO users (username, email, password_hash, first_name, last_name, phone, status)
VALUES (:username, :email, :password_hash, :first_name, :last_name, :phone, :status)
RETURNING id, created_at, updated_at, version
`
rows, err := r.db.NamedQueryContext(ctx, query, user)
if err != nil {
return fmt.Errorf("failed to create user: %w", err)
}
defer rows.Close()
if rows.Next() {
err = rows.Scan(&user.ID, &user.CreatedAt, &user.UpdatedAt, &user.Version)
if err != nil {
return fmt.Errorf("failed to scan user: %w", err)
}
}
return nil
}
func (r *PostgresUserRepository) GetByID(ctx context.Context, id uuid.UUID) (*User, error) {
query := `
SELECT id, username, email, password_hash, first_name, last_name, phone, status,
created_at, updated_at, version
FROM users
WHERE id = $1 AND status != 'deleted'
`
var user User
err := r.db.GetContext(ctx, &user, query, id)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("user not found: %w", err)
}
return nil, fmt.Errorf("failed to get user: %w", err)
}
return &user, nil
}
func (r *PostgresUserRepository) GetByEmail(ctx context.Context, email string) (*User, error) {
query := `
SELECT id, username, email, password_hash, first_name, last_name, phone, status,
created_at, updated_at, version
FROM users
WHERE email = $1 AND status != 'deleted'
`
var user User
err := r.db.GetContext(ctx, &user, query, email)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("user not found: %w", err)
}
return nil, fmt.Errorf("failed to get user: %w", err)
}
return &user, nil
}
func (r *PostgresUserRepository) GetByUsername(ctx context.Context, username string) (*User, error) {
query := `
SELECT id, username, email, password_hash, first_name, last_name, phone, status,
created_at, updated_at, version
FROM users
WHERE username = $1 AND status != 'deleted'
`
var user User
err := r.db.GetContext(ctx, &user, query, username)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("user not found: %w", err)
}
return nil, fmt.Errorf("failed to get user: %w", err)
}
return &user, nil
}
func (r *PostgresUserRepository) Update(ctx context.Context, user *User) error {
query := `
UPDATE users
SET username = :username, email = :email, password_hash = :password_hash,
first_name = :first_name, last_name = :last_name, phone = :phone, status = :status
WHERE id = :id AND version = :version
RETURNING updated_at, version
`
rows, err := r.db.NamedQueryContext(ctx, query, user)
if err != nil {
return fmt.Errorf("failed to update user: %w", err)
}
defer rows.Close()
if rows.Next() {
err = rows.Scan(&user.UpdatedAt, &user.Version)
if err != nil {
return fmt.Errorf("failed to scan updated user: %w", err)
}
} else {
return fmt.Errorf("user not found or version mismatch")
}
return nil
}
func (r *PostgresUserRepository) Delete(ctx context.Context, id uuid.UUID) error {
query := `UPDATE users SET status = 'deleted' WHERE id = $1`
result, err := r.db.ExecContext(ctx, query, id)
if err != nil {
return fmt.Errorf("failed to delete user: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rowsAffected == 0 {
return fmt.Errorf("user not found")
}
return nil
}
func (r *PostgresUserRepository) List(ctx context.Context, limit, offset int) ([]*User, error) {
query := `
SELECT id, username, email, password_hash, first_name, last_name, phone, status,
created_at, updated_at, version
FROM users
WHERE status != 'deleted'
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
`
var users []*User
err := r.db.SelectContext(ctx, &users, query, limit, offset)
if err != nil {
return nil, fmt.Errorf("failed to list users: %w", err)
}
return users, nil
}
func (r *PostgresUserRepository) Count(ctx context.Context) (int64, error) {
query := `SELECT COUNT(*) FROM users WHERE status != 'deleted'`
var count int64
err := r.db.GetContext(ctx, &count, query)
if err != nil {
return 0, fmt.Errorf("failed to count users: %w", err)
}
return count, nil
}
// DatabaseManager 数据库管理器
type DatabaseManager struct {
db *sqlx.DB
}
func NewDatabaseManager(databaseURL string) (*DatabaseManager, error) {
db, err := sqlx.Connect("postgres", databaseURL)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
// 配置连接池
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(time.Hour)
return &DatabaseManager{db: db}, nil
}
func (dm *DatabaseManager) Close() error {
return dm.db.Close()
}
func (dm *DatabaseManager) Health(ctx context.Context) error {
return dm.db.PingContext(ctx)
}
func (dm *DatabaseManager) GetDB() *sqlx.DB {
return dm.db
}
// Transaction 事务管理
func (dm *DatabaseManager) WithTransaction(ctx context.Context, fn func(*sqlx.Tx) error) error {
tx, err := dm.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if p := recover(); p != nil {
tx.Rollback()
panic(p)
} else if err != nil {
tx.Rollback()
} else {
err = tx.Commit()
}
}()
err = fn(tx)
return err
}
两阶段提交(2PC)
两阶段提交协议确保分布式事务的原子性。
// 两阶段提交实现
package twopc
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// TransactionManager 事务管理器
type TransactionManager struct {
participants map[string]Participant
transactions map[string]*Transaction
mutex sync.RWMutex
}
// Participant 参与者接口
type Participant interface {
Prepare(ctx context.Context, txID string, data interface{}) error
Commit(ctx context.Context, txID string) error
Abort(ctx context.Context, txID string) error
}
// Transaction 事务状态
type Transaction struct {
ID string
Status TransactionStatus
Participants []string
Data interface{}
StartTime time.Time
EndTime *time.Time
mutex sync.RWMutex
}
// TransactionStatus 事务状态
type TransactionStatus int
const (
TxStatusPending TransactionStatus = iota
TxStatusPreparing
TxStatusPrepared
TxStatusCommitting
TxStatusCommitted
TxStatusAborting
TxStatusAborted
)
func NewTransactionManager() *TransactionManager {
return &TransactionManager{
participants: make(map[string]Participant),
transactions: make(map[string]*Transaction),
}
}
func (tm *TransactionManager) RegisterParticipant(name string, participant Participant) {
tm.mutex.Lock()
defer tm.mutex.Unlock()
tm.participants[name] = participant
}
func (tm *TransactionManager) BeginTransaction(txID string, participantNames []string, data interface{}) error {
tm.mutex.Lock()
defer tm.mutex.Unlock()
if _, exists := tm.transactions[txID]; exists {
return fmt.Errorf("transaction %s already exists", txID)
}
// 验证所有参与者都已注册
for _, name := range participantNames {
if _, exists := tm.participants[name]; !exists {
return fmt.Errorf("participant %s not registered", name)
}
}
tx := &Transaction{
ID: txID,
Status: TxStatusPending,
Participants: participantNames,
Data: data,
StartTime: time.Now(),
}
tm.transactions[txID] = tx
return nil
}
func (tm *TransactionManager) CommitTransaction(ctx context.Context, txID string) error {
tm.mutex.RLock()
tx, exists := tm.transactions[txID]
tm.mutex.RUnlock()
if !exists {
return fmt.Errorf("transaction %s not found", txID)
}
// 阶段1:准备阶段
tx.mutex.Lock()
tx.Status = TxStatusPreparing
tx.mutex.Unlock()
log.Printf("Starting prepare phase for transaction %s", txID)
for _, participantName := range tx.Participants {
participant := tm.participants[participantName]
err := participant.Prepare(ctx, txID, tx.Data)
if err != nil {
log.Printf("Participant %s failed to prepare: %v", participantName, err)
return tm.abortTransaction(ctx, txID)
}
log.Printf("Participant %s prepared successfully", participantName)
}
tx.mutex.Lock()
tx.Status = TxStatusPrepared
tx.mutex.Unlock()
log.Printf("All participants prepared for transaction %s", txID)
// 阶段2:提交阶段
tx.mutex.Lock()
tx.Status = TxStatusCommitting
tx.mutex.Unlock()
log.Printf("Starting commit phase for transaction %s", txID)
for _, participantName := range tx.Participants {
participant := tm.participants[participantName]
err := participant.Commit(ctx, txID)
if err != nil {
log.Printf("Participant %s failed to commit: %v", participantName, err)
// 提交阶段失败,需要人工干预
return fmt.Errorf("commit failed for participant %s: %w", participantName, err)
}
log.Printf("Participant %s committed successfully", participantName)
}
tx.mutex.Lock()
tx.Status = TxStatusCommitted
now := time.Now()
tx.EndTime = &now
tx.mutex.Unlock()
log.Printf("Transaction %s committed successfully", txID)
return nil
}
func (tm *TransactionManager) abortTransaction(ctx context.Context, txID string) error {
tm.mutex.RLock()
tx, exists := tm.transactions[txID]
tm.mutex.RUnlock()
if !exists {
return fmt.Errorf("transaction %s not found", txID)
}
tx.mutex.Lock()
tx.Status = TxStatusAborting
tx.mutex.Unlock()
log.Printf("Aborting transaction %s", txID)
for _, participantName := range tx.Participants {
participant := tm.participants[participantName]
err := participant.Abort(ctx, txID)
if err != nil {
log.Printf("Participant %s failed to abort: %v", participantName, err)
}
}
tx.mutex.Lock()
tx.Status = TxStatusAborted
now := time.Now()
tx.EndTime = &now
tx.mutex.Unlock()
log.Printf("Transaction %s aborted", txID)
return fmt.Errorf("transaction aborted")
}
// 数据库参与者实现
type DatabaseParticipant struct {
name string
db interface{} // 数据库连接
preparedTx map[string]interface{} // 准备好的事务
mutex sync.RWMutex
}
func NewDatabaseParticipant(name string, db interface{}) *DatabaseParticipant {
return &DatabaseParticipant{
name: name,
db: db,
preparedTx: make(map[string]interface{}),
}
}
func (dp *DatabaseParticipant) Prepare(ctx context.Context, txID string, data interface{}) error {
log.Printf("[%s] Preparing transaction %s", dp.name, txID)
// 模拟数据库准备操作
// 在实际实现中,这里会开始数据库事务并执行操作,但不提交
dp.mutex.Lock()
dp.preparedTx[txID] = data
dp.mutex.Unlock()
log.Printf("[%s] Transaction %s prepared", dp.name, txID)
return nil
}
func (dp *DatabaseParticipant) Commit(ctx context.Context, txID string) error {
log.Printf("[%s] Committing transaction %s", dp.name, txID)
dp.mutex.Lock()
defer dp.mutex.Unlock()
if _, exists := dp.preparedTx[txID]; !exists {
return fmt.Errorf("transaction %s not prepared", txID)
}
// 模拟数据库提交操作
delete(dp.preparedTx, txID)
log.Printf("[%s] Transaction %s committed", dp.name, txID)
return nil
}
func (dp *DatabaseParticipant) Abort(ctx context.Context, txID string) error {
log.Printf("[%s] Aborting transaction %s", dp.name, txID)
dp.mutex.Lock()
defer dp.mutex.Unlock()
// 模拟数据库回滚操作
delete(dp.preparedTx, txID)
log.Printf("[%s] Transaction %s aborted", dp.name, txID)
return nil
}
CQRS模式
CQRS(Command Query Responsibility Segregation)将读写操作分离,提高系统性能和可扩展性。
// CQRS模式实现
package cqrs
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/google/uuid"
)
// Command 命令接口
type Command interface {
GetID() string
GetType() string
GetData() interface{}
}
// Query 查询接口
type Query interface {
GetID() string
GetType() string
GetCriteria() interface{}
}
// Event 事件接口
type Event interface {
GetID() string
GetType() string
GetAggregateID() string
GetData() interface{}
GetTimestamp() time.Time
}
// CommandHandler 命令处理器接口
type CommandHandler interface {
Handle(ctx context.Context, cmd Command) ([]Event, error)
}
// QueryHandler 查询处理器接口
type QueryHandler interface {
Handle(ctx context.Context, query Query) (interface{}, error)
}
// EventHandler 事件处理器接口
type EventHandler interface {
Handle(ctx context.Context, event Event) error
}
// CommandBus 命令总线
type CommandBus struct {
handlers map[string]CommandHandler
mutex sync.RWMutex
}
func NewCommandBus() *CommandBus {
return &CommandBus{
handlers: make(map[string]CommandHandler),
}
}
func (cb *CommandBus) RegisterHandler(commandType string, handler CommandHandler) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
cb.handlers[commandType] = handler
}
func (cb *CommandBus) Execute(ctx context.Context, cmd Command) ([]Event, error) {
cb.mutex.RLock()
handler, exists := cb.handlers[cmd.GetType()]
cb.mutex.RUnlock()
if !exists {
return nil, fmt.Errorf("no handler found for command type: %s", cmd.GetType())
}
log.Printf("Executing command %s of type %s", cmd.GetID(), cmd.GetType())
return handler.Handle(ctx, cmd)
}
// QueryBus 查询总线
type QueryBus struct {
handlers map[string]QueryHandler
mutex sync.RWMutex
}
func NewQueryBus() *QueryBus {
return &QueryBus{
handlers: make(map[string]QueryHandler),
}
}
func (qb *QueryBus) RegisterHandler(queryType string, handler QueryHandler) {
qb.mutex.Lock()
defer qb.mutex.Unlock()
qb.handlers[queryType] = handler
}
func (qb *QueryBus) Execute(ctx context.Context, query Query) (interface{}, error) {
qb.mutex.RLock()
handler, exists := qb.handlers[query.GetType()]
qb.mutex.RUnlock()
if !exists {
return nil, fmt.Errorf("no handler found for query type: %s", query.GetType())
}
log.Printf("Executing query %s of type %s", query.GetID(), query.GetType())
return handler.Handle(ctx, query)
}
// EventBus 事件总线
type EventBus struct {
handlers map[string][]EventHandler
mutex sync.RWMutex
}
func NewEventBus() *EventBus {
return &EventBus{
handlers: make(map[string][]EventHandler),
}
}
func (eb *EventBus) RegisterHandler(eventType string, handler EventHandler) {
eb.mutex.Lock()
defer eb.mutex.Unlock()
if _, exists := eb.handlers[eventType]; !exists {
eb.handlers[eventType] = make([]EventHandler, 0)
}
eb.handlers[eventType] = append(eb.handlers[eventType], handler)
}
func (eb *EventBus) Publish(ctx context.Context, events []Event) error {
for _, event := range events {
eb.mutex.RLock()
handlers, exists := eb.handlers[event.GetType()]
eb.mutex.RUnlock()
if !exists {
log.Printf("No handlers found for event type: %s", event.GetType())
continue
}
log.Printf("Publishing event %s of type %s", event.GetID(), event.GetType())
for _, handler := range handlers {
go func(h EventHandler, e Event) {
if err := h.Handle(ctx, e); err != nil {
log.Printf("Error handling event %s: %v", e.GetID(), err)
}
}(handler, event)
}
}
return nil
}
// 具体实现示例
// CreateUserCommand 创建用户命令
type CreateUserCommand struct {
ID string
Username string
Email string
Password string
}
func (c *CreateUserCommand) GetID() string {
return c.ID
}
func (c *CreateUserCommand) GetType() string {
return "CreateUser"
}
func (c *CreateUserCommand) GetData() interface{} {
return c
}
// UserCreatedEvent 用户创建事件
type UserCreatedEvent struct {
ID string
AggregateID string
UserID string
Username string
Email string
Timestamp time.Time
}
func (e *UserCreatedEvent) GetID() string {
return e.ID
}
func (e *UserCreatedEvent) GetType() string {
return "UserCreated"
}
func (e *UserCreatedEvent) GetAggregateID() string {
return e.AggregateID
}
func (e *UserCreatedEvent) GetData() interface{} {
return e
}
func (e *UserCreatedEvent) GetTimestamp() time.Time {
return e.Timestamp
}
// CreateUserCommandHandler 创建用户命令处理器
type CreateUserCommandHandler struct {
userRepository UserWriteRepository
}
func NewCreateUserCommandHandler(repo UserWriteRepository) *CreateUserCommandHandler {
return &CreateUserCommandHandler{
userRepository: repo,
}
}
func (h *CreateUserCommandHandler) Handle(ctx context.Context, cmd Command) ([]Event, error) {
createCmd, ok := cmd.(*CreateUserCommand)
if !ok {
return nil, fmt.Errorf("invalid command type")
}
// 验证用户数据
if createCmd.Username == "" || createCmd.Email == "" {
return nil, fmt.Errorf("username and email are required")
}
// 检查用户是否已存在
exists, err := h.userRepository.ExistsByEmail(ctx, createCmd.Email)
if err != nil {
return nil, fmt.Errorf("failed to check user existence: %w", err)
}
if exists {
return nil, fmt.Errorf("user with email %s already exists", createCmd.Email)
}
// 创建用户
userID := uuid.New().String()
user := &WriteUser{
ID: userID,
Username: createCmd.Username,
Email: createCmd.Email,
Password: createCmd.Password, // 在实际应用中应该加密
Status: "active",
}
err = h.userRepository.Create(ctx, user)
if err != nil {
return nil, fmt.Errorf("failed to create user: %w", err)
}
// 生成事件
event := &UserCreatedEvent{
ID: uuid.New().String(),
AggregateID: userID,
UserID: userID,
Username: createCmd.Username,
Email: createCmd.Email,
Timestamp: time.Now(),
}
return []Event{event}, nil
}
// GetUserQuery 获取用户查询
type GetUserQuery struct {
ID string
UserID string
}
func (q *GetUserQuery) GetID() string {
return q.ID
}
func (q *GetUserQuery) GetType() string {
return "GetUser"
}
func (q *GetUserQuery) GetCriteria() interface{} {
return q
}
// GetUserQueryHandler 获取用户查询处理器
type GetUserQueryHandler struct {
userRepository UserReadRepository
}
func NewGetUserQueryHandler(repo UserReadRepository) *GetUserQueryHandler {
return &GetUserQueryHandler{
userRepository: repo,
}
}
func (h *GetUserQueryHandler) Handle(ctx context.Context, query Query) (interface{}, error) {
getUserQuery, ok := query.(*GetUserQuery)
if !ok {
return nil, fmt.Errorf("invalid query type")
}
user, err := h.userRepository.GetByID(ctx, getUserQuery.UserID)
if err != nil {
return nil, fmt.Errorf("failed to get user: %w", err)
}
return user, nil
}
// UserCreatedEventHandler 用户创建事件处理器
type UserCreatedEventHandler struct {
readRepository UserReadRepository
}
func NewUserCreatedEventHandler(repo UserReadRepository) *UserCreatedEventHandler {
return &UserCreatedEventHandler{
readRepository: repo,
}
}
func (h *UserCreatedEventHandler) Handle(ctx context.Context, event Event) error {
userCreatedEvent, ok := event.(*UserCreatedEvent)
if !ok {
return fmt.Errorf("invalid event type")
}
// 更新读模型
readUser := &ReadUser{
ID: userCreatedEvent.UserID,
Username: userCreatedEvent.Username,
Email: userCreatedEvent.Email,
Status: "active",
CreatedAt: userCreatedEvent.Timestamp,
UpdatedAt: userCreatedEvent.Timestamp,
}
err := h.readRepository.Create(ctx, readUser)
if err != nil {
return fmt.Errorf("failed to create read user: %w", err)
}
log.Printf("Read model updated for user %s", userCreatedEvent.UserID)
return nil
}
// 数据模型定义
// WriteUser 写模型用户
type WriteUser struct {
ID string
Username string
Email string
Password string
Status string
}
// ReadUser 读模型用户
type ReadUser struct {
ID string `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// UserWriteRepository 用户写仓储接口
type UserWriteRepository interface {
Create(ctx context.Context, user *WriteUser) error
Update(ctx context.Context, user *WriteUser) error
Delete(ctx context.Context, id string) error
ExistsByEmail(ctx context.Context, email string) (bool, error)
}
// UserReadRepository 用户读仓储接口
type UserReadRepository interface {
GetByID(ctx context.Context, id string) (*ReadUser, error)
GetByEmail(ctx context.Context, email string) (*ReadUser, error)
List(ctx context.Context, limit, offset int) ([]*ReadUser, error)
Create(ctx context.Context, user *ReadUser) error
Update(ctx context.Context, user *ReadUser) error
}
// CQRS应用服务
type UserService struct {
commandBus *CommandBus
queryBus *QueryBus
eventBus *EventBus
}
func NewUserService(commandBus *CommandBus, queryBus *QueryBus, eventBus *EventBus) *UserService {
return &UserService{
commandBus: commandBus,
queryBus: queryBus,
eventBus: eventBus,
}
}
func (s *UserService) CreateUser(ctx context.Context, username, email, password string) error {
cmd := &CreateUserCommand{
ID: uuid.New().String(),
Username: username,
Email: email,
Password: password,
}
events, err := s.commandBus.Execute(ctx, cmd)
if err != nil {
return err
}
return s.eventBus.Publish(ctx, events)
}
func (s *UserService) GetUser(ctx context.Context, userID string) (*ReadUser, error) {
query := &GetUserQuery{
ID: uuid.New().String(),
UserID: userID,
}
result, err := s.queryBus.Execute(ctx, query)
if err != nil {
return nil, err
}
user, ok := result.(*ReadUser)
if !ok {
return nil, fmt.Errorf("invalid result type")
}
return user, nil
}
事件溯源模式
事件溯源通过存储事件序列来重建应用状态,提供完整的审计跟踪。
// 事件溯源实现
package eventsourcing
import (
"context"
"encoding/json"
"fmt"
"log"
"sort"
"sync"
"time"
"github.com/google/uuid"
)
// Event 事件接口
type Event interface {
GetEventID() string
GetEventType() string
GetAggregateID() string
GetEventData() interface{}
GetTimestamp() time.Time
GetVersion() int
}
// EventStore 事件存储接口
type EventStore interface {
SaveEvents(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error
GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
GetEventsFromVersion(ctx context.Context, aggregateID string, fromVersion int) ([]Event, error)
GetAllEvents(ctx context.Context, fromTimestamp time.Time) ([]Event, error)
}
// Aggregate 聚合根接口
type Aggregate interface {
GetID() string
GetVersion() int
GetUncommittedEvents() []Event
MarkEventsAsCommitted()
LoadFromHistory(events []Event)
}
// Repository 聚合仓储接口
type Repository interface {
Save(ctx context.Context, aggregate Aggregate, expectedVersion int) error
GetByID(ctx context.Context, id string) (Aggregate, error)
}
// BaseEvent 基础事件实现
type BaseEvent struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"`
AggregateID string `json:"aggregate_id"`
EventData interface{} `json:"event_data"`
Timestamp time.Time `json:"timestamp"`
Version int `json:"version"`
}
func (e *BaseEvent) GetEventID() string {
return e.EventID
}
func (e *BaseEvent) GetEventType() string {
return e.EventType
}
func (e *BaseEvent) GetAggregateID() string {
return e.AggregateID
}
func (e *BaseEvent) GetEventData() interface{} {
return e.EventData
}
func (e *BaseEvent) GetTimestamp() time.Time {
return e.Timestamp
}
func (e *BaseEvent) GetVersion() int {
return e.Version
}
// InMemoryEventStore 内存事件存储实现
type InMemoryEventStore struct {
events map[string][]Event
mutex sync.RWMutex
}
func NewInMemoryEventStore() *InMemoryEventStore {
return &InMemoryEventStore{
events: make(map[string][]Event),
}
}
func (es *InMemoryEventStore) SaveEvents(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error {
es.mutex.Lock()
defer es.mutex.Unlock()
existingEvents, exists := es.events[aggregateID]
currentVersion := 0
if exists {
currentVersion = len(existingEvents)
}
if expectedVersion != -1 && currentVersion != expectedVersion {
return fmt.Errorf("concurrency conflict: expected version %d, but current version is %d", expectedVersion, currentVersion)
}
// 设置事件版本
for i, event := range events {
if baseEvent, ok := event.(*BaseEvent); ok {
baseEvent.Version = currentVersion + i + 1
}
}
if !exists {
es.events[aggregateID] = make([]Event, 0)
}
es.events[aggregateID] = append(es.events[aggregateID], events...)
log.Printf("Saved %d events for aggregate %s", len(events), aggregateID)
return nil
}
func (es *InMemoryEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
es.mutex.RLock()
defer es.mutex.RUnlock()
events, exists := es.events[aggregateID]
if !exists {
return []Event{}, nil
}
// 返回副本
result := make([]Event, len(events))
copy(result, events)
return result, nil
}
func (es *InMemoryEventStore) GetEventsFromVersion(ctx context.Context, aggregateID string, fromVersion int) ([]Event, error) {
allEvents, err := es.GetEvents(ctx, aggregateID)
if err != nil {
return nil, err
}
var result []Event
for _, event := range allEvents {
if event.GetVersion() >= fromVersion {
result = append(result, event)
}
}
return result, nil
}
func (es *InMemoryEventStore) GetAllEvents(ctx context.Context, fromTimestamp time.Time) ([]Event, error) {
es.mutex.RLock()
defer es.mutex.RUnlock()
var allEvents []Event
for _, events := range es.events {
for _, event := range events {
if event.GetTimestamp().After(fromTimestamp) || event.GetTimestamp().Equal(fromTimestamp) {
allEvents = append(allEvents, event)
}
}
}
// 按时间戳排序
sort.Slice(allEvents, func(i, j int) bool {
return allEvents[i].GetTimestamp().Before(allEvents[j].GetTimestamp())
})
return allEvents, nil
}
// BaseAggregate 基础聚合实现
type BaseAggregate struct {
ID string
Version int
UncommittedEvents []Event
}
func (a *BaseAggregate) GetID() string {
return a.ID
}
func (a *BaseAggregate) GetVersion() int {
return a.Version
}
func (a *BaseAggregate) GetUncommittedEvents() []Event {
return a.UncommittedEvents
}
func (a *BaseAggregate) MarkEventsAsCommitted() {
a.Version += len(a.UncommittedEvents)
a.UncommittedEvents = []Event{}
}
func (a *BaseAggregate) ApplyEvent(event Event) {
a.UncommittedEvents = append(a.UncommittedEvents, event)
}
// EventSourcingRepository 事件溯源仓储实现
type EventSourcingRepository struct {
eventStore EventStore
aggregateType func() Aggregate
}
func NewEventSourcingRepository(eventStore EventStore, aggregateType func() Aggregate) *EventSourcingRepository {
return &EventSourcingRepository{
eventStore: eventStore,
aggregateType: aggregateType,
}
}
func (r *EventSourcingRepository) Save(ctx context.Context, aggregate Aggregate, expectedVersion int) error {
uncommittedEvents := aggregate.GetUncommittedEvents()
if len(uncommittedEvents) == 0 {
return nil
}
err := r.eventStore.SaveEvents(ctx, aggregate.GetID(), uncommittedEvents, expectedVersion)
if err != nil {
return fmt.Errorf("failed to save events: %w", err)
}
aggregate.MarkEventsAsCommitted()
return nil
}
func (r *EventSourcingRepository) GetByID(ctx context.Context, id string) (Aggregate, error) {
events, err := r.eventStore.GetEvents(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to get events: %w", err)
}
if len(events) == 0 {
return nil, fmt.Errorf("aggregate not found: %s", id)
}
aggregate := r.aggregateType()
aggregate.LoadFromHistory(events)
return aggregate, nil
}
// 用户聚合示例
// UserAggregate 用户聚合
type UserAggregate struct {
BaseAggregate
Username string
Email string
Status string
}
func NewUserAggregate(id string) *UserAggregate {
return &UserAggregate{
BaseAggregate: BaseAggregate{
ID: id,
},
}
}
func (u *UserAggregate) LoadFromHistory(events []Event) {
for _, event := range events {
u.applyEvent(event)
u.Version++
}
}
func (u *UserAggregate) CreateUser(username, email string) error {
if u.Status != "" {
return fmt.Errorf("user already exists")
}
event := &BaseEvent{
EventID: uuid.New().String(),
EventType: "UserCreated",
AggregateID: u.ID,
EventData: map[string]interface{}{
"username": username,
"email": email,
},
Timestamp: time.Now(),
}
u.applyEvent(event)
u.ApplyEvent(event)
return nil
}
func (u *UserAggregate) UpdateEmail(newEmail string) error {
if u.Status != "active" {
return fmt.Errorf("user is not active")
}
if u.Email == newEmail {
return nil // 没有变化
}
event := &BaseEvent{
EventID: uuid.New().String(),
EventType: "UserEmailUpdated",
AggregateID: u.ID,
EventData: map[string]interface{}{
"old_email": u.Email,
"new_email": newEmail,
},
Timestamp: time.Now(),
}
u.applyEvent(event)
u.ApplyEvent(event)
return nil
}
func (u *UserAggregate) DeactivateUser() error {
if u.Status != "active" {
return fmt.Errorf("user is not active")
}
event := &BaseEvent{
EventID: uuid.New().String(),
EventType: "UserDeactivated",
AggregateID: u.ID,
EventData: map[string]interface{}{},
Timestamp: time.Now(),
}
u.applyEvent(event)
u.ApplyEvent(event)
return nil
}
func (u *UserAggregate) applyEvent(event Event) {
switch event.GetEventType() {
case "UserCreated":
data := event.GetEventData().(map[string]interface{})
u.Username = data["username"].(string)
u.Email = data["email"].(string)
u.Status = "active"
case "UserEmailUpdated":
data := event.GetEventData().(map[string]interface{})
u.Email = data["new_email"].(string)
case "UserDeactivated":
u.Status = "inactive"
}
}
// 事件投影器
type EventProjector struct {
eventStore EventStore
projections map[string]func(Event) error
lastProcessedTimestamp time.Time
mutex sync.RWMutex
}
func NewEventProjector(eventStore EventStore) *EventProjector {
return &EventProjector{
eventStore: eventStore,
projections: make(map[string]func(Event) error),
}
}
func (ep *EventProjector) RegisterProjection(eventType string, handler func(Event) error) {
ep.mutex.Lock()
defer ep.mutex.Unlock()
ep.projections[eventType] = handler
}
func (ep *EventProjector) ProcessEvents(ctx context.Context) error {
ep.mutex.RLock()
fromTimestamp := ep.lastProcessedTimestamp
ep.mutex.RUnlock()
events, err := ep.eventStore.GetAllEvents(ctx, fromTimestamp)
if err != nil {
return fmt.Errorf("failed to get events: %w", err)
}
for _, event := range events {
ep.mutex.RLock()
handler, exists := ep.projections[event.GetEventType()]
ep.mutex.RUnlock()
if exists {
err := handler(event)
if err != nil {
log.Printf("Error processing event %s: %v", event.GetEventID(), err)
continue
}
}
ep.mutex.Lock()
if event.GetTimestamp().After(ep.lastProcessedTimestamp) {
ep.lastProcessedTimestamp = event.GetTimestamp()
}
ep.mutex.Unlock()
}
return nil
}
// 使用示例
func ExampleEventSourcing() {
// 创建事件存储
eventStore := NewInMemoryEventStore()
// 创建仓储
userRepo := NewEventSourcingRepository(eventStore, func() Aggregate {
return &UserAggregate{}
})
ctx := context.Background()
// 创建用户聚合
userID := uuid.New().String()
user := NewUserAggregate(userID)
// 执行业务操作
err := user.CreateUser("john_doe", "john@example.com")
if err != nil {
log.Printf("Error creating user: %v", err)
return
}
// 保存聚合
err = userRepo.Save(ctx, user, -1)
if err != nil {
log.Printf("Error saving user: %v", err)
return
}
// 重新加载聚合
loadedUser, err := userRepo.GetByID(ctx, userID)
if err != nil {
log.Printf("Error loading user: %v", err)
return
}
userAggregate := loadedUser.(*UserAggregate)
log.Printf("Loaded user: %s, %s, %s", userAggregate.Username, userAggregate.Email, userAggregate.Status)
// 更新邮箱
err = userAggregate.UpdateEmail("john.doe@example.com")
if err != nil {
log.Printf("Error updating email: %v", err)
return
}
// 保存更新
err = userRepo.Save(ctx, userAggregate, userAggregate.GetVersion())
if err != nil {
log.Printf("Error saving updated user: %v", err)
return
}
log.Printf("User updated successfully")
}
数据一致性保证
最终一致性
在分布式系统中,最终一致性是一种常见的一致性模型。
// 最终一致性实现
package consistency
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// ConsistencyLevel 一致性级别
type ConsistencyLevel int
const (
ConsistencyLevelEventual ConsistencyLevel = iota
ConsistencyLevelStrong
ConsistencyLevelWeak
)
// DataSynchronizer 数据同步器
type DataSynchronizer struct {
replicas map[string]DataReplica
mutex sync.RWMutex
config SyncConfig
}
// DataReplica 数据副本接口
type DataReplica interface {
Write(ctx context.Context, key string, value interface{}) error
Read(ctx context.Context, key string) (interface{}, error)
GetLastModified(ctx context.Context, key string) (time.Time, error)
Sync(ctx context.Context, changes []DataChange) error
}
// DataChange 数据变更
type DataChange struct {
Key string
Value interface{}
Timestamp time.Time
Version int64
}
// SyncConfig 同步配置
type SyncConfig struct {
SyncInterval time.Duration
RetryAttempts int
RetryDelay time.Duration
ConflictResolver func(local, remote DataChange) DataChange
}
func NewDataSynchronizer(config SyncConfig) *DataSynchronizer {
return &DataSynchronizer{
replicas: make(map[string]DataReplica),
config: config,
}
}
func (ds *DataSynchronizer) RegisterReplica(name string, replica DataReplica) {
ds.mutex.Lock()
defer ds.mutex.Unlock()
ds.replicas[name] = replica
}
func (ds *DataSynchronizer) WriteWithEventualConsistency(ctx context.Context, key string, value interface{}) error {
ds.mutex.RLock()
replicas := make([]DataReplica, 0, len(ds.replicas))
for _, replica := range ds.replicas {
replicas = append(replicas, replica)
}
ds.mutex.RUnlock()
// 异步写入所有副本
var wg sync.WaitGroup
errors := make(chan error, len(replicas))
for _, replica := range replicas {
wg.Add(1)
go func(r DataReplica) {
defer wg.Done()
for attempt := 0; attempt < ds.config.RetryAttempts; attempt++ {
err := r.Write(ctx, key, value)
if err == nil {
return
}
log.Printf("Write attempt %d failed: %v", attempt+1, err)
if attempt < ds.config.RetryAttempts-1 {
time.Sleep(ds.config.RetryDelay)
} else {
errors <- err
}
}
}(replica)
}
wg.Wait()
close(errors)
// 检查是否有错误
var lastError error
errorCount := 0
for err := range errors {
lastError = err
errorCount++
}
// 如果大部分副本写入成功,认为操作成功
if errorCount < len(replicas)/2 {
return nil
}
return fmt.Errorf("write failed on %d replicas, last error: %w", errorCount, lastError)
}
func (ds *DataSynchronizer) ReadWithEventualConsistency(ctx context.Context, key string) (interface{}, error) {
ds.mutex.RLock()
replicas := make([]DataReplica, 0, len(ds.replicas))
for _, replica := range ds.replicas {
replicas = append(replicas, replica)
}
ds.mutex.RUnlock()
// 从第一个可用的副本读取
for _, replica := range replicas {
value, err := replica.Read(ctx, key)
if err == nil {
return value, nil
}
log.Printf("Read from replica failed: %v", err)
}
return nil, fmt.Errorf("failed to read from all replicas")
}
func (ds *DataSynchronizer) StartSynchronization(ctx context.Context) {
ticker := time.NewTicker(ds.config.SyncInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := ds.synchronizeReplicas(ctx)
if err != nil {
log.Printf("Synchronization failed: %v", err)
}
}
}
}
func (ds *DataSynchronizer) synchronizeReplicas(ctx context.Context) error {
ds.mutex.RLock()
replicas := make(map[string]DataReplica)
for name, replica := range ds.replicas {
replicas[name] = replica
}
ds.mutex.RUnlock()
// 收集所有副本的变更
allChanges := make(map[string]map[string]DataChange)
for name, replica := range replicas {
// 这里简化实现,实际应该获取自上次同步以来的变更
allChanges[name] = make(map[string]DataChange)
}
// 解决冲突并同步
for key, replicaChanges := range ds.collectChanges(allChanges) {
resolvedChange := ds.resolveConflicts(replicaChanges)
// 将解决后的变更应用到所有副本
for name, replica := range replicas {
err := replica.Sync(ctx, []DataChange{resolvedChange})
if err != nil {
log.Printf("Failed to sync change to replica %s: %v", name, err)
}
}
}
return nil
}
func (ds *DataSynchronizer) collectChanges(allChanges map[string]map[string]DataChange) map[string][]DataChange {
changesByKey := make(map[string][]DataChange)
for _, replicaChanges := range allChanges {
for key, change := range replicaChanges {
changesByKey[key] = append(changesByKey[key], change)
}
}
return changesByKey
}
func (ds *DataSynchronizer) resolveConflicts(changes []DataChange) DataChange {
if len(changes) == 0 {
return DataChange{}
}
if len(changes) == 1 {
return changes[0]
}
// 使用配置的冲突解决器
if ds.config.ConflictResolver != nil {
resolved := changes[0]
for i := 1; i < len(changes); i++ {
resolved = ds.config.ConflictResolver(resolved, changes[i])
}
return resolved
}
// 默认使用最新时间戳的变更
latest := changes[0]
for _, change := range changes[1:] {
if change.Timestamp.After(latest.Timestamp) {
latest = change
}
}
return latest
}
// 内存副本实现
type InMemoryReplica struct {
data map[string]interface{}
lastModified map[string]time.Time
mutex sync.RWMutex
name string
}
func NewInMemoryReplica(name string) *InMemoryReplica {
return &InMemoryReplica{
data: make(map[string]interface{}),
lastModified: make(map[string]time.Time),
name: name,
}
}
func (r *InMemoryReplica) Write(ctx context.Context, key string, value interface{}) error {
r.mutex.Lock()
defer r.mutex.Unlock()
r.data[key] = value
r.lastModified[key] = time.Now()
log.Printf("[%s] Written key %s", r.name, key)
return nil
}
func (r *InMemoryReplica) Read(ctx context.Context, key string) (interface{}, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
value, exists := r.data[key]
if !exists {
return nil, fmt.Errorf("key not found: %s", key)
}
log.Printf("[%s] Read key %s", r.name, key)
return value, nil
}
func (r *InMemoryReplica) GetLastModified(ctx context.Context, key string) (time.Time, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
timestamp, exists := r.lastModified[key]
if !exists {
return time.Time{}, fmt.Errorf("key not found: %s", key)
}
return timestamp, nil
}
func (r *InMemoryReplica) Sync(ctx context.Context, changes []DataChange) error {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, change := range changes {
r.data[change.Key] = change.Value
r.lastModified[change.Key] = change.Timestamp
log.Printf("[%s] Synced key %s", r.name, change.Key)
}
return nil
}
使用示例
// 最终一致性使用示例
func ExampleEventualConsistency() {
// 创建同步配置
config := SyncConfig{
SyncInterval: time.Second * 30,
RetryAttempts: 3,
RetryDelay: time.Millisecond * 100,
ConflictResolver: func(local, remote DataChange) DataChange {
// 使用最新时间戳的变更
if remote.Timestamp.After(local.Timestamp) {
return remote
}
return local
},
}
// 创建数据同步器
synchronizer := NewDataSynchronizer(config)
// 注册副本
replica1 := NewInMemoryReplica("replica-1")
replica2 := NewInMemoryReplica("replica-2")
replica3 := NewInMemoryReplica("replica-3")
synchronizer.RegisterReplica("replica-1", replica1)
synchronizer.RegisterReplica("replica-2", replica2)
synchronizer.RegisterReplica("replica-3", replica3)
ctx := context.Background()
// 启动同步
go synchronizer.StartSynchronization(ctx)
// 写入数据
err := synchronizer.WriteWithEventualConsistency(ctx, "user:123", map[string]interface{}{
"name": "John Doe",
"email": "john@example.com",
})
if err != nil {
log.Printf("Write failed: %v", err)
return
}
// 等待一段时间让数据同步
time.Sleep(time.Second * 2)
// 读取数据
value, err := synchronizer.ReadWithEventualConsistency(ctx, "user:123")
if err != nil {
log.Printf("Read failed: %v", err)
return
}
log.Printf("Read value: %+v", value)
}
本章总结
本章深入探讨了微服务架构中的数据管理与一致性问题,涵盖了以下关键内容:
数据库模式
每服务一个数据库模式
- 提供了完全的数据隔离和技术栈自由
- 避免了服务间的数据耦合
- 需要处理分布式事务和数据一致性问题
共享数据库模式
- 简化了事务处理和数据一致性
- 可能导致服务间的紧耦合
- 适用于过渡期或特定场景
分布式事务处理
Saga模式
- 通过补偿操作处理长事务
- 支持编排和协调两种实现方式
- 提供了良好的可扩展性和容错性
两阶段提交(2PC)
- 保证强一致性
- 存在阻塞和单点故障风险
- 适用于对一致性要求极高的场景
架构模式
CQRS模式
- 分离命令和查询职责
- 优化读写性能
- 支持复杂的业务逻辑和报表需求
事件溯源模式
- 通过事件序列重建状态
- 提供完整的审计跟踪
- 支持时间旅行和状态回溯
数据一致性保证
- 最终一致性
- 在分布式环境中平衡性能和一致性
- 通过异步同步和冲突解决机制
- 适用于大多数业务场景
最佳实践
选择合适的一致性模型
- 根据业务需求选择强一致性或最终一致性
- 考虑性能、可用性和复杂性的权衡
设计幂等操作
- 确保重试操作的安全性
- 使用唯一标识符防止重复处理
实现补偿机制
- 为每个操作设计相应的补偿操作
- 确保系统能够从失败中恢复
监控和可观测性
- 监控事务状态和数据一致性
- 实现分布式追踪和日志聚合
下一章我们将探讨微服务的安全与认证,包括身份认证、授权机制、API安全和数据保护等内容。
共享数据库模式
在某些情况下,多个服务可能需要共享数据库,但需要严格的访问控制。
# 共享数据库配置
apiVersion: v1
kind: ConfigMap
metadata:
name: shared-db-config
namespace: microservices
data:
database.yml: |
shared_database:
host: shared-postgres
port: 5432
database: shared_data
services:
user_service:
username: user_service
schema: user_service
permissions:
- SELECT
- INSERT
- UPDATE
- DELETE
tables:
- users
- user_profiles
order_service:
username: order_service
schema: order_service
permissions:
- SELECT
- INSERT
- UPDATE
- DELETE
tables:
- orders
- order_items
analytics_service:
username: analytics_service
schema: analytics
permissions:
- SELECT
tables:
- user_service.users
- order_service.orders
- order_service.order_items
-- 共享数据库权限管理脚本
-- shared-db-permissions.sql
-- 创建服务专用用户
CREATE USER user_service WITH PASSWORD 'user_service_password';
CREATE USER order_service WITH PASSWORD 'order_service_password';
CREATE USER analytics_service WITH PASSWORD 'analytics_service_password';
-- 创建服务专用模式
CREATE SCHEMA IF NOT EXISTS user_service AUTHORIZATION user_service;
CREATE SCHEMA IF NOT EXISTS order_service AUTHORIZATION order_service;
CREATE SCHEMA IF NOT EXISTS analytics AUTHORIZATION analytics_service;
-- 设置默认权限
ALTER DEFAULT PRIVILEGES IN SCHEMA user_service
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO user_service;
ALTER DEFAULT PRIVILEGES IN SCHEMA order_service
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO order_service;
ALTER DEFAULT PRIVILEGES IN SCHEMA analytics
GRANT SELECT ON TABLES TO analytics_service;
-- 为分析服务授予只读权限
GRANT USAGE ON SCHEMA user_service TO analytics_service;
GRANT USAGE ON SCHEMA order_service TO analytics_service;
GRANT SELECT ON ALL TABLES IN SCHEMA user_service TO analytics_service;
GRANT SELECT ON ALL TABLES IN SCHEMA order_service TO analytics_service;
-- 创建行级安全策略
ALTER TABLE user_service.users ENABLE ROW LEVEL SECURITY;
CREATE POLICY user_service_policy ON user_service.users
FOR ALL TO user_service
USING (true);
CREATE POLICY analytics_read_policy ON user_service.users
FOR SELECT TO analytics_service
USING (status = 'active');
分布式事务处理
Saga模式
Saga模式通过一系列本地事务来实现分布式事务。
// Saga模式实现
package saga
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// SagaStep Saga步骤
type SagaStep struct {
Name string
Execute func(ctx context.Context, data interface{}) (interface{}, error)
Compensate func(ctx context.Context, data interface{}) error
RetryPolicy *RetryPolicy
}
// RetryPolicy 重试策略
type RetryPolicy struct {
MaxAttempts int
Delay time.Duration
Multiplier float64
}
// SagaExecution Saga执行状态
type SagaExecution struct {
ID string
Steps []*SagaStep
CurrentStep int
Data interface{}
Status SagaStatus
StartTime time.Time
EndTime *time.Time
Error error
CompletedSteps []int
mutex sync.RWMutex
}
// SagaStatus Saga状态
type SagaStatus int
const (
SagaStatusPending SagaStatus = iota
SagaStatusRunning
SagaStatusCompleted
SagaStatusFailed
SagaStatusCompensating
SagaStatusCompensated
)
func (s SagaStatus) String() string {
switch s {
case SagaStatusPending:
return "PENDING"
case SagaStatusRunning:
return "RUNNING"
case SagaStatusCompleted:
return "COMPLETED"
case SagaStatusFailed:
return "FAILED"
case SagaStatusCompensating:
return "COMPENSATING"
case SagaStatusCompensated:
return "COMPENSATED"
default:
return "UNKNOWN"
}
}
// SagaOrchestrator Saga编排器
type SagaOrchestrator struct {
executions map[string]*SagaExecution
mutex sync.RWMutex
}
func NewSagaOrchestrator() *SagaOrchestrator {
return &SagaOrchestrator{
executions: make(map[string]*SagaExecution),
}
}
func (so *SagaOrchestrator) Execute(ctx context.Context, sagaID string, steps []*SagaStep, data interface{}) error {
execution := &SagaExecution{
ID: sagaID,
Steps: steps,
Data: data,
Status: SagaStatusPending,
StartTime: time.Now(),
CompletedSteps: make([]int, 0),
}
so.mutex.Lock()
so.executions[sagaID] = execution
so.mutex.Unlock()
return so.executeSteps(ctx, execution)
}
func (so *SagaOrchestrator) executeSteps(ctx context.Context, execution *SagaExecution) error {
execution.mutex.Lock()
execution.Status = SagaStatusRunning
execution.mutex.Unlock()
for i, step := range execution.Steps {
execution.mutex.Lock()
execution.CurrentStep = i
execution.mutex.Unlock()
log.Printf("Executing saga step %d: %s", i, step.Name)
result, err := so.executeStepWithRetry(ctx, step, execution.Data)
if err != nil {
log.Printf("Saga step %d failed: %v", i, err)
execution.mutex.Lock()
execution.Status = SagaStatusFailed
execution.Error = err
execution.mutex.Unlock()
// 执行补偿
return so.compensate(ctx, execution)
}
execution.mutex.Lock()
execution.Data = result
execution.CompletedSteps = append(execution.CompletedSteps, i)
execution.mutex.Unlock()
log.Printf("Saga step %d completed: %s", i, step.Name)
}
execution.mutex.Lock()
execution.Status = SagaStatusCompleted
now := time.Now()
execution.EndTime = &now
execution.mutex.Unlock()
log.Printf("Saga %s completed successfully", execution.ID)
return nil
}
func (so *SagaOrchestrator) executeStepWithRetry(ctx context.Context, step *SagaStep, data interface{}) (interface{}, error) {
var lastErr error
maxAttempts := 1
delay := time.Second
multiplier := 2.0
if step.RetryPolicy != nil {
maxAttempts = step.RetryPolicy.MaxAttempts
delay = step.RetryPolicy.Delay
multiplier = step.RetryPolicy.Multiplier
}
for attempt := 1; attempt <= maxAttempts; attempt++ {
result, err := step.Execute(ctx, data)
if err == nil {
return result, nil
}
lastErr = err
if attempt < maxAttempts {
log.Printf("Step %s attempt %d failed: %v, retrying in %v", step.Name, attempt, err, delay)
time.Sleep(delay)
delay = time.Duration(float64(delay) * multiplier)
}
}
return nil, fmt.Errorf("step %s failed after %d attempts: %w", step.Name, maxAttempts, lastErr)
}
func (so *SagaOrchestrator) compensate(ctx context.Context, execution *SagaExecution) error {
execution.mutex.Lock()
execution.Status = SagaStatusCompensating
execution.mutex.Unlock()
log.Printf("Starting compensation for saga %s", execution.ID)
// 逆序执行补偿
for i := len(execution.CompletedSteps) - 1; i >= 0; i-- {
stepIndex := execution.CompletedSteps[i]
step := execution.Steps[stepIndex]
if step.Compensate != nil {
log.Printf("Compensating step %d: %s", stepIndex, step.Name)
err := step.Compensate(ctx, execution.Data)
if err != nil {
log.Printf("Compensation failed for step %d: %v", stepIndex, err)
// 补偿失败,需要人工干预
return fmt.Errorf("compensation failed for step %s: %w", step.Name, err)
}
log.Printf("Compensation completed for step %d: %s", stepIndex, step.Name)
}
}
execution.mutex.Lock()
execution.Status = SagaStatusCompensated
now := time.Now()
execution.EndTime = &now
execution.mutex.Unlock()
log.Printf("Compensation completed for saga %s", execution.ID)
return execution.Error
}
func (so *SagaOrchestrator) GetExecution(sagaID string) (*SagaExecution, bool) {
so.mutex.RLock()
defer so.mutex.RUnlock()
execution, exists := so.executions[sagaID]
return execution, exists
}
func (so *SagaOrchestrator) GetStatus(sagaID string) (SagaStatus, bool) {
execution, exists := so.GetExecution(sagaID)
if !exists {
return SagaStatusPending, false
}
execution.mutex.RLock()
defer execution.mutex.RUnlock()
return execution.Status, true
}
// 订单处理Saga示例
type OrderData struct {
OrderID string
UserID string
ProductID string
Quantity int
Amount float64
PaymentID string
InventoryID string
}
func CreateOrderSaga() []*SagaStep {
return []*SagaStep{
{
Name: "ValidateOrder",
Execute: func(ctx context.Context, data interface{}) (interface{}, error) {
orderData := data.(*OrderData)
log.Printf("Validating order %s", orderData.OrderID)
// 模拟订单验证
if orderData.Quantity <= 0 {
return nil, fmt.Errorf("invalid quantity: %d", orderData.Quantity)
}
return orderData, nil
},
Compensate: func(ctx context.Context, data interface{}) error {
// 验证步骤通常不需要补偿
return nil
},
},
{
Name: "ReserveInventory",
Execute: func(ctx context.Context, data interface{}) (interface{}, error) {
orderData := data.(*OrderData)
log.Printf("Reserving inventory for order %s", orderData.OrderID)
// 模拟库存预留
orderData.InventoryID = fmt.Sprintf("inv_%s", orderData.OrderID)
return orderData, nil
},
Compensate: func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
log.Printf("Releasing inventory reservation %s", orderData.InventoryID)
// 模拟释放库存预留
return nil
},
RetryPolicy: &RetryPolicy{
MaxAttempts: 3,
Delay: time.Second,
Multiplier: 2.0,
},
},
{
Name: "ProcessPayment",
Execute: func(ctx context.Context, data interface{}) (interface{}, error) {
orderData := data.(*OrderData)
log.Printf("Processing payment for order %s", orderData.OrderID)
// 模拟支付处理
orderData.PaymentID = fmt.Sprintf("pay_%s", orderData.OrderID)
return orderData, nil
},
Compensate: func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
log.Printf("Refunding payment %s", orderData.PaymentID)
// 模拟退款
return nil
},
RetryPolicy: &RetryPolicy{
MaxAttempts: 3,
Delay: time.Second,
Multiplier: 2.0,
},
},
{
Name: "CreateOrder",
Execute: func(ctx context.Context, data interface{}) (interface{}, error) {
orderData := data.(*OrderData)
log.Printf("Creating order %s", orderData.OrderID)
// 模拟订单创建
return orderData, nil
},
Compensate: func(ctx context.Context, data interface{}) error {
orderData := data.(*OrderData)
log.Printf("Canceling order %s", orderData.OrderID)
// 模拟订单取消
return nil
},
},
}
}
// 使用示例
func main() {
orchestrator := NewSagaOrchestrator()
orderData := &OrderData{
OrderID: "order_123",
UserID: "user_456",
ProductID: "product_789",
Quantity: 2,
Amount: 99.99,
}
steps := CreateOrderSaga()
ctx := context.Background()
err := orchestrator.Execute(ctx, "saga_order_123", steps, orderData)
if err != nil {
log.Printf("Saga execution failed: %v", err)
}
// 检查执行状态
status, exists := orchestrator.GetStatus("saga_order_123")
if exists {
log.Printf("Saga status: %s", status)
}
}