3.1 同步通信模式
REST API通信
// REST API客户端实现
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// HTTPClient REST客户端接口
type HTTPClient interface {
Get(ctx context.Context, url string, headers map[string]string) (*http.Response, error)
Post(ctx context.Context, url string, body interface{}, headers map[string]string) (*http.Response, error)
Put(ctx context.Context, url string, body interface{}, headers map[string]string) (*http.Response, error)
Delete(ctx context.Context, url string, headers map[string]string) (*http.Response, error)
}
// RestClient REST客户端实现
type RestClient struct {
client *http.Client
baseURL string
timeout time.Duration
}
func NewRestClient(baseURL string, timeout time.Duration) *RestClient {
return &RestClient{
client: &http.Client{
Timeout: timeout,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableKeepAlives: false,
},
},
baseURL: baseURL,
timeout: timeout,
}
}
func (c *RestClient) Get(ctx context.Context, url string, headers map[string]string) (*http.Response, error) {
return c.doRequest(ctx, "GET", url, nil, headers)
}
func (c *RestClient) Post(ctx context.Context, url string, body interface{}, headers map[string]string) (*http.Response, error) {
return c.doRequest(ctx, "POST", url, body, headers)
}
func (c *RestClient) Put(ctx context.Context, url string, body interface{}, headers map[string]string) (*http.Response, error) {
return c.doRequest(ctx, "PUT", url, body, headers)
}
func (c *RestClient) Delete(ctx context.Context, url string, headers map[string]string) (*http.Response, error) {
return c.doRequest(ctx, "DELETE", url, nil, headers)
}
func (c *RestClient) doRequest(ctx context.Context, method, url string, body interface{}, headers map[string]string) (*http.Response, error) {
fullURL := c.baseURL + url
var reqBody io.Reader
if body != nil {
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err)
}
reqBody = bytes.NewBuffer(jsonBody)
}
req, err := http.NewRequestWithContext(ctx, method, fullURL, reqBody)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
// 设置默认头部
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("User-Agent", "microservice-client/1.0")
// 设置自定义头部
for key, value := range headers {
req.Header.Set(key, value)
}
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
return resp, nil
}
// 服务客户端示例
type UserServiceClient struct {
client HTTPClient
}
func NewUserServiceClient(baseURL string) *UserServiceClient {
return &UserServiceClient{
client: NewRestClient(baseURL, 30*time.Second),
}
}
type User struct {
ID string `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
Status string `json:"status"`
}
type CreateUserRequest struct {
Username string `json:"username"`
Email string `json:"email"`
Password string `json:"password"`
}
type UpdateUserRequest struct {
Username string `json:"username,omitempty"`
Email string `json:"email,omitempty"`
Status string `json:"status,omitempty"`
}
func (c *UserServiceClient) GetUser(ctx context.Context, userID string, authToken string) (*User, error) {
headers := map[string]string{
"Authorization": "Bearer " + authToken,
}
resp, err := c.client.Get(ctx, "/users/"+userID, headers)
if err != nil {
return nil, fmt.Errorf("failed to get user: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("get user failed with status %d: %s", resp.StatusCode, string(body))
}
var user User
if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &user, nil
}
func (c *UserServiceClient) CreateUser(ctx context.Context, req *CreateUserRequest, authToken string) (*User, error) {
headers := map[string]string{
"Authorization": "Bearer " + authToken,
}
resp, err := c.client.Post(ctx, "/users", req, headers)
if err != nil {
return nil, fmt.Errorf("failed to create user: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("create user failed with status %d: %s", resp.StatusCode, string(body))
}
var user User
if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &user, nil
}
func (c *UserServiceClient) UpdateUser(ctx context.Context, userID string, req *UpdateUserRequest, authToken string) (*User, error) {
headers := map[string]string{
"Authorization": "Bearer " + authToken,
}
resp, err := c.client.Put(ctx, "/users/"+userID, req, headers)
if err != nil {
return nil, fmt.Errorf("failed to update user: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("update user failed with status %d: %s", resp.StatusCode, string(body))
}
var user User
if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &user, nil
}
func (c *UserServiceClient) DeleteUser(ctx context.Context, userID string, authToken string) error {
headers := map[string]string{
"Authorization": "Bearer " + authToken,
}
resp, err := c.client.Delete(ctx, "/users/"+userID, headers)
if err != nil {
return fmt.Errorf("failed to delete user: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("delete user failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// 使用示例
func main() {
ctx := context.Background()
// 创建用户服务客户端
userClient := NewUserServiceClient("http://user-service:8080")
// 创建用户
createReq := &CreateUserRequest{
Username: "john_doe",
Email: "john@example.com",
Password: "password123",
}
user, err := userClient.CreateUser(ctx, createReq, "auth-token-123")
if err != nil {
fmt.Printf("Failed to create user: %v\n", err)
return
}
fmt.Printf("Created user: %+v\n", user)
// 获取用户
fetchedUser, err := userClient.GetUser(ctx, user.ID, "auth-token-123")
if err != nil {
fmt.Printf("Failed to get user: %v\n", err)
return
}
fmt.Printf("Fetched user: %+v\n", fetchedUser)
// 更新用户
updateReq := &UpdateUserRequest{
Status: "active",
}
updatedUser, err := userClient.UpdateUser(ctx, user.ID, updateReq, "auth-token-123")
if err != nil {
fmt.Printf("Failed to update user: %v\n", err)
return
}
fmt.Printf("Updated user: %+v\n", updatedUser)
}
API网关模式
API网关作为微服务架构的入口点,提供统一的访问接口。
# Kong API网关配置
apiVersion: v1
kind: ConfigMap
metadata:
name: kong-config
namespace: api-gateway
data:
kong.conf: |
database = off
declarative_config = /kong/declarative/kong.yml
proxy_listen = 0.0.0.0:8000
admin_listen = 0.0.0.0:8001
log_level = info
kong.yml: |
_format_version: "3.0"
_transform: true
services:
- name: user-service
url: http://user-service:8080
plugins:
- name: rate-limiting
config:
minute: 100
hour: 1000
- name: cors
config:
origins:
- "*"
methods:
- GET
- POST
- PUT
- DELETE
headers:
- Accept
- Content-Type
- Authorization
- name: order-service
url: http://order-service:8080
plugins:
- name: jwt
config:
secret_is_base64: false
- name: request-transformer
config:
add:
headers:
- "X-Service-Name:order-service"
routes:
- name: user-routes
service: user-service
paths:
- "/api/v1/users"
strip_path: false
- name: order-routes
service: order-service
paths:
- "/api/v1/orders"
strip_path: false
consumers:
- username: api-client
jwt_secrets:
- key: api-key
secret: my-secret-key
algorithm: HS256
plugins:
- name: prometheus
config:
per_consumer: true
- name: zipkin
config:
http_endpoint: http://zipkin:9411/api/v2/spans
sample_ratio: 0.1
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kong-gateway
namespace: api-gateway
spec:
replicas: 2
selector:
matchLabels:
app: kong-gateway
template:
metadata:
labels:
app: kong-gateway
spec:
containers:
- name: kong
image: kong:3.4
ports:
- containerPort: 8000
name: proxy
- containerPort: 8001
name: admin
env:
- name: KONG_DATABASE
value: "off"
- name: KONG_DECLARATIVE_CONFIG
value: "/kong/declarative/kong.yml"
- name: KONG_PROXY_LISTEN
value: "0.0.0.0:8000"
- name: KONG_ADMIN_LISTEN
value: "0.0.0.0:8001"
- name: KONG_LOG_LEVEL
value: "info"
volumeMounts:
- name: kong-config
mountPath: /kong/declarative
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /status
port: 8001
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /status
port: 8001
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: kong-config
configMap:
name: kong-config
---
apiVersion: v1
kind: Service
metadata:
name: kong-gateway
namespace: api-gateway
spec:
selector:
app: kong-gateway
ports:
- name: proxy
port: 8000
targetPort: 8000
- name: admin
port: 8001
targetPort: 8001
type: LoadBalancer
// Go语言API网关实现
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"sync"
"time"
"github.com/gorilla/mux"
"golang.org/x/time/rate"
)
// Route 路由配置
type Route struct {
Path string `json:"path"`
Method string `json:"method"`
ServiceName string `json:"service_name"`
ServiceURL string `json:"service_url"`
Middleware []string `json:"middleware"`
Headers map[string]string `json:"headers"`
Timeout time.Duration `json:"timeout"`
}
// Gateway API网关
type Gateway struct {
routes map[string]*Route
rateLimiter *RateLimiter
auth *AuthMiddleware
logger *Logger
metrics *Metrics
mutex sync.RWMutex
}
func NewGateway() *Gateway {
return &Gateway{
routes: make(map[string]*Route),
rateLimiter: NewRateLimiter(),
auth: NewAuthMiddleware(),
logger: NewLogger(),
metrics: NewMetrics(),
}
}
func (g *Gateway) AddRoute(route *Route) {
g.mutex.Lock()
defer g.mutex.Unlock()
key := fmt.Sprintf("%s:%s", route.Method, route.Path)
g.routes[key] = route
log.Printf("Route added: %s %s -> %s", route.Method, route.Path, route.ServiceURL)
}
func (g *Gateway) RemoveRoute(method, path string) {
g.mutex.Lock()
defer g.mutex.Unlock()
key := fmt.Sprintf("%s:%s", method, path)
delete(g.routes, key)
log.Printf("Route removed: %s %s", method, path)
}
func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 记录请求
g.logger.LogRequest(r)
// 查找路由
route := g.findRoute(r.Method, r.URL.Path)
if route == nil {
http.Error(w, "Route not found", http.StatusNotFound)
return
}
// 应用中间件
for _, middleware := range route.Middleware {
switch middleware {
case "rate_limit":
if !g.rateLimiter.Allow(r) {
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return
}
case "auth":
if !g.auth.Authenticate(r) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
case "cors":
g.applyCORS(w, r)
}
}
// 代理请求
g.proxyRequest(w, r, route)
// 记录指标
duration := time.Since(start)
g.metrics.RecordRequest(route.ServiceName, r.Method, duration)
}
func (g *Gateway) findRoute(method, path string) *Route {
g.mutex.RLock()
defer g.mutex.RUnlock()
// 精确匹配
key := fmt.Sprintf("%s:%s", method, path)
if route, exists := g.routes[key]; exists {
return route
}
// 前缀匹配
for routeKey, route := range g.routes {
parts := strings.Split(routeKey, ":")
if len(parts) != 2 {
continue
}
routeMethod, routePath := parts[0], parts[1]
if routeMethod == method && strings.HasPrefix(path, routePath) {
return route
}
}
return nil
}
func (g *Gateway) proxyRequest(w http.ResponseWriter, r *http.Request, route *Route) {
// 解析目标URL
target, err := url.Parse(route.ServiceURL)
if err != nil {
http.Error(w, "Invalid service URL", http.StatusInternalServerError)
return
}
// 创建反向代理
proxy := httputil.NewSingleHostReverseProxy(target)
// 设置超时
if route.Timeout > 0 {
ctx, cancel := context.WithTimeout(r.Context(), route.Timeout)
defer cancel()
r = r.WithContext(ctx)
}
// 添加自定义头部
for key, value := range route.Headers {
r.Header.Set(key, value)
}
// 添加追踪头部
r.Header.Set("X-Gateway-Time", time.Now().Format(time.RFC3339))
r.Header.Set("X-Service-Name", route.ServiceName)
// 执行代理
proxy.ServeHTTP(w, r)
}
func (g *Gateway) applyCORS(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
}
// RateLimiter 限流器
type RateLimiter struct {
limiters map[string]*rate.Limiter
mutex sync.RWMutex
}
func NewRateLimiter() *RateLimiter {
return &RateLimiter{
limiters: make(map[string]*rate.Limiter),
}
}
func (rl *RateLimiter) Allow(r *http.Request) bool {
key := r.RemoteAddr // 可以使用IP、用户ID等作为key
rl.mutex.RLock()
limiter, exists := rl.limiters[key]
rl.mutex.RUnlock()
if !exists {
rl.mutex.Lock()
limiter = rate.NewLimiter(rate.Limit(10), 20) // 每秒10个请求,突发20个
rl.limiters[key] = limiter
rl.mutex.Unlock()
}
return limiter.Allow()
}
// AuthMiddleware 认证中间件
type AuthMiddleware struct {
jwtSecret string
}
func NewAuthMiddleware() *AuthMiddleware {
return &AuthMiddleware{
jwtSecret: "your-secret-key",
}
}
func (am *AuthMiddleware) Authenticate(r *http.Request) bool {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
return false
}
// 简单的Bearer token验证
if strings.HasPrefix(authHeader, "Bearer ") {
token := strings.TrimPrefix(authHeader, "Bearer ")
// 这里应该验证JWT token
return token != ""
}
return false
}
// Logger 日志记录器
type Logger struct {
// 日志配置
}
func NewLogger() *Logger {
return &Logger{}
}
func (l *Logger) LogRequest(r *http.Request) {
log.Printf("[%s] %s %s - %s", time.Now().Format(time.RFC3339), r.Method, r.URL.Path, r.RemoteAddr)
}
// Metrics 指标收集器
type Metrics struct {
requests map[string]int64
mutex sync.RWMutex
}
func NewMetrics() *Metrics {
return &Metrics{
requests: make(map[string]int64),
}
}
func (m *Metrics) RecordRequest(service, method string, duration time.Duration) {
m.mutex.Lock()
defer m.mutex.Unlock()
key := fmt.Sprintf("%s:%s", service, method)
m.requests[key]++
log.Printf("Request to %s %s took %v", service, method, duration)
}
func (m *Metrics) GetMetrics() map[string]int64 {
m.mutex.RLock()
defer m.mutex.RUnlock()
result := make(map[string]int64)
for k, v := range m.requests {
result[k] = v
}
return result
}
// 使用示例
func main() {
gateway := NewGateway()
// 添加路由
gateway.AddRoute(&Route{
Path: "/api/v1/users",
Method: "GET",
ServiceName: "user-service",
ServiceURL: "http://user-service:8080",
Middleware: []string{"rate_limit", "auth", "cors"},
Headers: map[string]string{
"X-Gateway": "true",
},
Timeout: 30 * time.Second,
})
gateway.AddRoute(&Route{
Path: "/api/v1/orders",
Method: "POST",
ServiceName: "order-service",
ServiceURL: "http://order-service:8080",
Middleware: []string{"rate_limit", "auth"},
Timeout: 15 * time.Second,
})
// 健康检查端点
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "healthy"})
})
// 指标端点
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
metrics := gateway.metrics.GetMetrics()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(metrics)
})
// 启动网关
http.Handle("/", gateway)
log.Println("API Gateway starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
熔断器模式
熔断器模式用于防止级联故障,当服务不可用时快速失败。
// Go语言熔断器实现
package main
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
)
// CircuitState 熔断器状态
type CircuitState int
const (
StateClosed CircuitState = iota
StateHalfOpen
StateOpen
)
func (s CircuitState) String() string {
switch s {
case StateClosed:
return "CLOSED"
case StateHalfOpen:
return "HALF_OPEN"
case StateOpen:
return "OPEN"
default:
return "UNKNOWN"
}
}
// CircuitBreakerConfig 熔断器配置
type CircuitBreakerConfig struct {
MaxRequests uint32 // 半开状态下的最大请求数
Interval time.Duration // 统计时间窗口
Timeout time.Duration // 开启状态的超时时间
ReadyToTrip func(counts Counts) bool // 判断是否应该开启熔断器
OnStateChange func(name string, from CircuitState, to CircuitState) // 状态变化回调
IsSuccessful func(err error) bool // 判断请求是否成功
}
// Counts 统计信息
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
func (c *Counts) onRequest() {
c.Requests++
}
func (c *Counts) onSuccess() {
c.TotalSuccesses++
c.ConsecutiveSuccesses++
c.ConsecutiveFailures = 0
}
func (c *Counts) onFailure() {
c.TotalFailures++
c.ConsecutiveFailures++
c.ConsecutiveSuccesses = 0
}
func (c *Counts) clear() {
c.Requests = 0
c.TotalSuccesses = 0
c.TotalFailures = 0
c.ConsecutiveSuccesses = 0
c.ConsecutiveFailures = 0
}
// CircuitBreaker 熔断器
type CircuitBreaker struct {
name string
maxRequests uint32
interval time.Duration
timeout time.Duration
readyToTrip func(counts Counts) bool
isSuccessful func(err error) bool
onStateChange func(name string, from CircuitState, to CircuitState)
mutex sync.Mutex
state CircuitState
generation uint64
counts Counts
expiry time.Time
}
func NewCircuitBreaker(name string, config CircuitBreakerConfig) *CircuitBreaker {
cb := &CircuitBreaker{
name: name,
maxRequests: config.MaxRequests,
interval: config.Interval,
timeout: config.Timeout,
readyToTrip: config.ReadyToTrip,
isSuccessful: config.IsSuccessful,
onStateChange: config.OnStateChange,
state: StateClosed,
expiry: time.Now().Add(config.Interval),
}
if cb.maxRequests == 0 {
cb.maxRequests = 1
}
if cb.interval <= 0 {
cb.interval = 60 * time.Second
}
if cb.timeout <= 0 {
cb.timeout = 60 * time.Second
}
if cb.readyToTrip == nil {
cb.readyToTrip = func(counts Counts) bool {
return counts.ConsecutiveFailures > 5
}
}
if cb.isSuccessful == nil {
cb.isSuccessful = func(err error) bool {
return err == nil
}
}
return cb
}
func (cb *CircuitBreaker) Name() string {
return cb.name
}
func (cb *CircuitBreaker) State() CircuitState {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, _ := cb.currentState(now)
return state
}
func (cb *CircuitBreaker) Counts() Counts {
cb.mutex.Lock()
defer cb.mutex.Unlock()
return cb.counts
}
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e)
}
}()
result, err := req()
cb.afterRequest(generation, cb.isSuccessful(err))
return result, err
}
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if state == StateOpen {
return generation, errors.New("circuit breaker is open")
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
return generation, errors.New("too many requests")
}
cb.counts.onRequest()
return generation, nil
}
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
return
}
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
func (cb *CircuitBreaker) onSuccess(state CircuitState, now time.Time) {
cb.counts.onSuccess()
if state == StateHalfOpen {
cb.setState(StateClosed, now)
}
}
func (cb *CircuitBreaker) onFailure(state CircuitState, now time.Time) {
cb.counts.onFailure()
if cb.readyToTrip(cb.counts) {
cb.setState(StateOpen, now)
}
}
func (cb *CircuitBreaker) currentState(now time.Time) (CircuitState, uint64) {
switch cb.state {
case StateClosed:
if !cb.expiry.IsZero() && cb.expiry.Before(now) {
cb.toNewGeneration(now)
}
case StateOpen:
if cb.expiry.Before(now) {
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
func (cb *CircuitBreaker) setState(state CircuitState, now time.Time) {
if cb.state == state {
return
}
prev := cb.state
cb.state = state
cb.toNewGeneration(now)
if cb.onStateChange != nil {
cb.onStateChange(cb.name, prev, state)
}
log.Printf("Circuit breaker '%s' state changed from %s to %s", cb.name, prev, state)
}
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
cb.generation++
cb.counts.clear()
var zero time.Time
switch cb.state {
case StateClosed:
if cb.interval == 0 {
cb.expiry = zero
} else {
cb.expiry = now.Add(cb.interval)
}
case StateOpen:
cb.expiry = now.Add(cb.timeout)
default: // StateHalfOpen
cb.expiry = zero
}
}
// RetryConfig 重试配置
type RetryConfig struct {
MaxAttempts int
Delay time.Duration
MaxDelay time.Duration
Multiplier float64
Jitter bool
}
// RetryableFunc 可重试的函数
type RetryableFunc func() error
// IsRetryable 判断错误是否可重试
type IsRetryable func(error) bool
// Retry 重试执行器
type Retry struct {
config RetryConfig
isRetryable IsRetryable
}
func NewRetry(config RetryConfig, isRetryable IsRetryable) *Retry {
if config.MaxAttempts <= 0 {
config.MaxAttempts = 3
}
if config.Delay <= 0 {
config.Delay = 100 * time.Millisecond
}
if config.MaxDelay <= 0 {
config.MaxDelay = 30 * time.Second
}
if config.Multiplier <= 0 {
config.Multiplier = 2.0
}
if isRetryable == nil {
isRetryable = func(err error) bool {
return true // 默认所有错误都可重试
}
}
return &Retry{
config: config,
isRetryable: isRetryable,
}
}
func (r *Retry) Execute(ctx context.Context, fn RetryableFunc) error {
var lastErr error
delay := r.config.Delay
for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err := fn()
if err == nil {
return nil
}
lastErr = err
if attempt == r.config.MaxAttempts {
break
}
if !r.isRetryable(err) {
return err
}
log.Printf("Attempt %d failed: %v, retrying in %v", attempt, err, delay)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
// 计算下次延迟时间
delay = time.Duration(float64(delay) * r.config.Multiplier)
if delay > r.config.MaxDelay {
delay = r.config.MaxDelay
}
// 添加抖动
if r.config.Jitter {
jitter := time.Duration(float64(delay) * 0.1 * (2*time.Now().UnixNano()%2 - 1))
delay += jitter
}
}
return fmt.Errorf("max attempts (%d) exceeded, last error: %w", r.config.MaxAttempts, lastErr)
}
// ResilientClient 弹性客户端
type ResilientClient struct {
circuitBreaker *CircuitBreaker
retry *Retry
timeout time.Duration
}
func NewResilientClient(name string, cbConfig CircuitBreakerConfig, retryConfig RetryConfig, timeout time.Duration) *ResilientClient {
cb := NewCircuitBreaker(name, cbConfig)
retry := NewRetry(retryConfig, func(err error) bool {
// 熔断器开启时不重试
return err.Error() != "circuit breaker is open"
})
return &ResilientClient{
circuitBreaker: cb,
retry: retry,
timeout: timeout,
}
}
func (rc *ResilientClient) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
if rc.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, rc.timeout)
defer cancel()
}
var result interface{}
var err error
retryErr := rc.retry.Execute(ctx, func() error {
result, err = rc.circuitBreaker.Execute(fn)
return err
})
if retryErr != nil {
return nil, retryErr
}
return result, err
}
// 使用示例
func main() {
// 创建熔断器配置
cbConfig := CircuitBreakerConfig{
MaxRequests: 3,
Interval: 10 * time.Second,
Timeout: 30 * time.Second,
ReadyToTrip: func(counts Counts) bool {
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= 3 && failureRatio >= 0.6
},
OnStateChange: func(name string, from CircuitState, to CircuitState) {
log.Printf("Circuit breaker '%s' changed from %s to %s", name, from, to)
},
IsSuccessful: func(err error) bool {
return err == nil
},
}
// 创建重试配置
retryConfig := RetryConfig{
MaxAttempts: 3,
Delay: 100 * time.Millisecond,
MaxDelay: 5 * time.Second,
Multiplier: 2.0,
Jitter: true,
}
// 创建弹性客户端
client := NewResilientClient("user-service", cbConfig, retryConfig, 10*time.Second)
ctx := context.Background()
// 模拟服务调用
for i := 0; i < 10; i++ {
result, err := client.Execute(ctx, func() (interface{}, error) {
// 模拟服务调用
if i%3 == 0 {
return nil, errors.New("service unavailable")
}
return fmt.Sprintf("Success %d", i), nil
})
if err != nil {
log.Printf("Request %d failed: %v", i, err)
} else {
log.Printf("Request %d succeeded: %v", i, result)
}
time.Sleep(1 * time.Second)
}
// 打印熔断器状态
log.Printf("Final circuit breaker state: %s", client.circuitBreaker.State())
log.Printf("Final counts: %+v", client.circuitBreaker.Counts())
}
gRPC通信
前面已经介绍了gRPC的基本实现,这里不再重复。
本章总结
本章详细介绍了微服务架构中的服务间通信模式和技术实现:
主要内容回顾
同步通信模式
- REST API:基于HTTP的轻量级通信方式
- gRPC:高性能的RPC框架,支持多种语言
- 适用于需要立即响应的场景
异步通信模式
- 消息队列:解耦服务间依赖,提高系统弹性
- 事件流:支持事件驱动架构,实现最终一致性
- 适用于可以容忍延迟的场景
服务发现与负载均衡
- 服务注册与发现:动态管理服务实例
- 负载均衡算法:轮询、随机、加权轮询等
- 健康检查:确保只路由到健康的服务实例
API网关模式
- 统一入口:简化客户端调用
- 横切关注点:认证、限流、监控等
- 协议转换:支持多种通信协议
弹性设计模式
- 熔断器:防止级联故障
- 重试机制:处理临时性故障
- 超时控制:避免资源耗尽
最佳实践
选择合适的通信模式
- 同步通信用于查询操作
- 异步通信用于事件通知
- 混合使用以满足不同需求
实施弹性设计
- 始终考虑故障场景
- 实现优雅降级
- 监控和告警
性能优化
- 连接池管理
- 请求批处理
- 缓存策略
安全考虑
- 服务间认证
- 传输加密
- 访问控制
下一章预告
下一章将探讨微服务架构中的数据管理与一致性问题,包括: - 数据库设计模式 - 分布式事务处理 - 数据一致性策略 - CQRS和Event Sourcing模式
// user_service.proto
syntax = "proto3";
package user;
option go_package = "./proto/user";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
// 用户服务定义
service UserService {
// 获取用户信息
rpc GetUser(GetUserRequest) returns (GetUserResponse);
// 创建用户
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
// 更新用户
rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
// 删除用户
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
// 列出用户
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
// 流式获取用户更新
rpc WatchUserUpdates(WatchUserUpdatesRequest) returns (stream UserUpdateEvent);
}
// 用户消息
message User {
string id = 1;
string username = 2;
string email = 3;
string status = 4;
google.protobuf.Timestamp created_at = 5;
google.protobuf.Timestamp updated_at = 6;
}
// 请求消息
message GetUserRequest {
string id = 1;
}
message GetUserResponse {
User user = 1;
}
message CreateUserRequest {
string username = 1;
string email = 2;
string password = 3;
}
message CreateUserResponse {
User user = 1;
}
message UpdateUserRequest {
string id = 1;
string username = 2;
string email = 3;
string status = 4;
}
message UpdateUserResponse {
User user = 1;
}
message DeleteUserRequest {
string id = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
string filter = 3;
}
message ListUsersResponse {
repeated User users = 1;
int32 total = 2;
int32 page = 3;
int32 page_size = 4;
}
message WatchUserUpdatesRequest {
repeated string user_ids = 1;
}
message UserUpdateEvent {
enum EventType {
CREATED = 0;
UPDATED = 1;
DELETED = 2;
}
EventType event_type = 1;
User user = 2;
google.protobuf.Timestamp timestamp = 3;
}
// gRPC服务端实现
package main
import (
"context"
"fmt"
"log"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
pb "./proto/user"
)
// UserServiceServer gRPC服务实现
type UserServiceServer struct {
pb.UnimplementedUserServiceServer
users map[string]*pb.User
mutex sync.RWMutex
watchers map[string][]chan *pb.UserUpdateEvent
}
func NewUserServiceServer() *UserServiceServer {
return &UserServiceServer{
users: make(map[string]*pb.User),
watchers: make(map[string][]chan *pb.UserUpdateEvent),
}
}
func (s *UserServiceServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
user, exists := s.users[req.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
}
return &pb.GetUserResponse{
User: user,
}, nil
}
func (s *UserServiceServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// 验证输入
if req.Username == "" {
return nil, status.Errorf(codes.InvalidArgument, "username is required")
}
if req.Email == "" {
return nil, status.Errorf(codes.InvalidArgument, "email is required")
}
// 检查用户名是否已存在
for _, user := range s.users {
if user.Username == req.Username {
return nil, status.Errorf(codes.AlreadyExists, "username already exists: %s", req.Username)
}
if user.Email == req.Email {
return nil, status.Errorf(codes.AlreadyExists, "email already exists: %s", req.Email)
}
}
// 创建用户
userID := fmt.Sprintf("user_%d", time.Now().UnixNano())
now := timestamppb.Now()
user := &pb.User{
Id: userID,
Username: req.Username,
Email: req.Email,
Status: "active",
CreatedAt: now,
UpdatedAt: now,
}
s.users[userID] = user
// 通知观察者
s.notifyWatchers(&pb.UserUpdateEvent{
EventType: pb.UserUpdateEvent_CREATED,
User: user,
Timestamp: now,
})
return &pb.CreateUserResponse{
User: user,
}, nil
}
func (s *UserServiceServer) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.UpdateUserResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
user, exists := s.users[req.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
}
// 更新字段
if req.Username != "" {
user.Username = req.Username
}
if req.Email != "" {
user.Email = req.Email
}
if req.Status != "" {
user.Status = req.Status
}
user.UpdatedAt = timestamppb.Now()
// 通知观察者
s.notifyWatchers(&pb.UserUpdateEvent{
EventType: pb.UserUpdateEvent_UPDATED,
User: user,
Timestamp: user.UpdatedAt,
})
return &pb.UpdateUserResponse{
User: user,
}, nil
}
func (s *UserServiceServer) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*emptypb.Empty, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
user, exists := s.users[req.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
}
delete(s.users, req.Id)
// 通知观察者
s.notifyWatchers(&pb.UserUpdateEvent{
EventType: pb.UserUpdateEvent_DELETED,
User: user,
Timestamp: timestamppb.Now(),
})
return &emptypb.Empty{}, nil
}
func (s *UserServiceServer) ListUsers(ctx context.Context, req *pb.ListUsersRequest) (*pb.ListUsersResponse, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
var users []*pb.User
for _, user := range s.users {
users = append(users, user)
}
// 简单分页
total := len(users)
page := req.Page
pageSize := req.PageSize
if page <= 0 {
page = 1
}
if pageSize <= 0 {
pageSize = 10
}
start := (page - 1) * pageSize
end := start + pageSize
if start >= int32(total) {
users = []*pb.User{}
} else {
if end > int32(total) {
end = int32(total)
}
users = users[start:end]
}
return &pb.ListUsersResponse{
Users: users,
Total: int32(total),
Page: page,
PageSize: pageSize,
}, nil
}
func (s *UserServiceServer) WatchUserUpdates(req *pb.WatchUserUpdatesRequest, stream pb.UserService_WatchUserUpdatesServer) error {
// 创建事件通道
eventCh := make(chan *pb.UserUpdateEvent, 100)
// 注册观察者
s.mutex.Lock()
for _, userID := range req.UserIds {
if s.watchers[userID] == nil {
s.watchers[userID] = make([]chan *pb.UserUpdateEvent, 0)
}
s.watchers[userID] = append(s.watchers[userID], eventCh)
}
s.mutex.Unlock()
// 清理函数
defer func() {
s.mutex.Lock()
for _, userID := range req.UserIds {
watchers := s.watchers[userID]
for i, ch := range watchers {
if ch == eventCh {
s.watchers[userID] = append(watchers[:i], watchers[i+1:]...)
break
}
}
}
s.mutex.Unlock()
close(eventCh)
}()
// 监听事件并发送
for {
select {
case event := <-eventCh:
if err := stream.Send(event); err != nil {
return err
}
case <-stream.Context().Done():
return stream.Context().Err()
}
}
}
func (s *UserServiceServer) notifyWatchers(event *pb.UserUpdateEvent) {
userID := event.User.Id
watchers := s.watchers[userID]
for _, ch := range watchers {
select {
case ch <- event:
default:
// 通道满了,跳过
}
}
}
func main() {
// 创建gRPC服务器
server := grpc.NewServer()
// 注册服务
userService := NewUserServiceServer()
pb.RegisterUserServiceServer(server, userService)
// 监听端口
listener, err := net.Listen("tcp", ":9090")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
log.Println("gRPC server listening on :9090")
// 启动服务器
if err := server.Serve(listener); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
// gRPC客户端实现
package main
import (
"context"
"fmt"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "./proto/user"
)
type UserServiceClient struct {
client pb.UserServiceClient
conn *grpc.ClientConn
}
func NewUserServiceClient(address string) (*UserServiceClient, error) {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
client := pb.NewUserServiceClient(conn)
return &UserServiceClient{
client: client,
conn: conn,
}, nil
}
func (c *UserServiceClient) Close() error {
return c.conn.Close()
}
func (c *UserServiceClient) CreateUser(ctx context.Context, username, email, password string) (*pb.User, error) {
req := &pb.CreateUserRequest{
Username: username,
Email: email,
Password: password,
}
resp, err := c.client.CreateUser(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to create user: %w", err)
}
return resp.User, nil
}
func (c *UserServiceClient) GetUser(ctx context.Context, userID string) (*pb.User, error) {
req := &pb.GetUserRequest{
Id: userID,
}
resp, err := c.client.GetUser(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get user: %w", err)
}
return resp.User, nil
}
func (c *UserServiceClient) UpdateUser(ctx context.Context, userID, username, email, status string) (*pb.User, error) {
req := &pb.UpdateUserRequest{
Id: userID,
Username: username,
Email: email,
Status: status,
}
resp, err := c.client.UpdateUser(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to update user: %w", err)
}
return resp.User, nil
}
func (c *UserServiceClient) DeleteUser(ctx context.Context, userID string) error {
req := &pb.DeleteUserRequest{
Id: userID,
}
_, err := c.client.DeleteUser(ctx, req)
if err != nil {
return fmt.Errorf("failed to delete user: %w", err)
}
return nil
}
func (c *UserServiceClient) ListUsers(ctx context.Context, page, pageSize int32) ([]*pb.User, error) {
req := &pb.ListUsersRequest{
Page: page,
PageSize: pageSize,
}
resp, err := c.client.ListUsers(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to list users: %w", err)
}
return resp.Users, nil
}
func (c *UserServiceClient) WatchUserUpdates(ctx context.Context, userIDs []string) error {
req := &pb.WatchUserUpdatesRequest{
UserIds: userIDs,
}
stream, err := c.client.WatchUserUpdates(ctx, req)
if err != nil {
return fmt.Errorf("failed to watch user updates: %w", err)
}
for {
event, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to receive event: %w", err)
}
fmt.Printf("Received event: %s for user %s\n", event.EventType, event.User.Id)
}
return nil
}
func main() {
// 创建客户端
client, err := NewUserServiceClient("localhost:9090")
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 创建用户
user, err := client.CreateUser(ctx, "john_doe", "john@example.com", "password123")
if err != nil {
log.Fatalf("Failed to create user: %v", err)
}
fmt.Printf("Created user: %+v\n", user)
// 获取用户
fetchedUser, err := client.GetUser(ctx, user.Id)
if err != nil {
log.Fatalf("Failed to get user: %v", err)
}
fmt.Printf("Fetched user: %+v\n", fetchedUser)
// 更新用户
updatedUser, err := client.UpdateUser(ctx, user.Id, "john_smith", "", "inactive")
if err != nil {
log.Fatalf("Failed to update user: %v", err)
}
fmt.Printf("Updated user: %+v\n", updatedUser)
// 列出用户
users, err := client.ListUsers(ctx, 1, 10)
if err != nil {
log.Fatalf("Failed to list users: %v", err)
}
fmt.Printf("Listed %d users\n", len(users))
// 监听用户更新(在新的goroutine中)
go func() {
watchCtx, watchCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer watchCancel()
if err := client.WatchUserUpdates(watchCtx, []string{user.Id}); err != nil {
log.Printf("Watch failed: %v", err)
}
}()
// 等待一段时间让监听生效
time.Sleep(1 * time.Second)
// 再次更新用户以触发事件
_, err = client.UpdateUser(ctx, user.Id, "", "john.smith@example.com", "active")
if err != nil {
log.Printf("Failed to update user: %v", err)
}
// 等待事件处理
time.Sleep(2 * time.Second)
}
3.2 异步通信模式
消息队列实现
# 基于RabbitMQ的异步通信实现
import asyncio
import json
import logging
from datetime import datetime
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod
import aio_pika
from aio_pika import Message, DeliveryMode
from aio_pika.abc import AbstractRobustConnection, AbstractRobustChannel
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class MessageEnvelope:
"""消息信封"""
id: str
type: str
source: str
timestamp: str
correlation_id: Optional[str] = None
reply_to: Optional[str] = None
data: Dict[str, Any] = None
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'MessageEnvelope':
return cls(**data)
class MessageHandler(ABC):
"""消息处理器接口"""
@abstractmethod
async def handle(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
"""处理消息,返回响应消息(如果需要)"""
pass
@abstractmethod
def get_message_types(self) -> list[str]:
"""获取处理的消息类型"""
pass
class MessageBus:
"""消息总线"""
def __init__(self, connection_url: str):
self.connection_url = connection_url
self.connection: Optional[AbstractRobustConnection] = None
self.channel: Optional[AbstractRobustChannel] = None
self.handlers: Dict[str, list[MessageHandler]] = {}
self.exchanges: Dict[str, Any] = {}
self.queues: Dict[str, Any] = {}
async def connect(self):
"""连接到消息代理"""
try:
self.connection = await aio_pika.connect_robust(self.connection_url)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=10)
logger.info("Connected to message broker")
except Exception as e:
logger.error(f"Failed to connect to message broker: {e}")
raise
async def disconnect(self):
"""断开连接"""
if self.connection:
await self.connection.close()
logger.info("Disconnected from message broker")
async def declare_exchange(self, name: str, exchange_type: str = "topic", durable: bool = True):
"""声明交换机"""
if not self.channel:
raise RuntimeError("Not connected to message broker")
exchange = await self.channel.declare_exchange(
name,
type=exchange_type,
durable=durable
)
self.exchanges[name] = exchange
logger.info(f"Declared exchange: {name}")
return exchange
async def declare_queue(self, name: str, durable: bool = True, exclusive: bool = False):
"""声明队列"""
if not self.channel:
raise RuntimeError("Not connected to message broker")
queue = await self.channel.declare_queue(
name,
durable=durable,
exclusive=exclusive
)
self.queues[name] = queue
logger.info(f"Declared queue: {name}")
return queue
async def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str):
"""绑定队列到交换机"""
queue = self.queues.get(queue_name)
exchange = self.exchanges.get(exchange_name)
if not queue or not exchange:
raise ValueError("Queue or exchange not found")
await queue.bind(exchange, routing_key)
logger.info(f"Bound queue {queue_name} to exchange {exchange_name} with routing key {routing_key}")
async def publish(self,
exchange_name: str,
routing_key: str,
envelope: MessageEnvelope,
delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT):
"""发布消息"""
if not self.channel:
raise RuntimeError("Not connected to message broker")
exchange = self.exchanges.get(exchange_name)
if not exchange:
raise ValueError(f"Exchange {exchange_name} not found")
message_body = json.dumps(envelope.to_dict()).encode()
message = Message(
message_body,
delivery_mode=delivery_mode,
message_id=envelope.id,
correlation_id=envelope.correlation_id,
reply_to=envelope.reply_to,
timestamp=datetime.fromisoformat(envelope.timestamp)
)
await exchange.publish(message, routing_key)
logger.info(f"Published message {envelope.id} to {exchange_name}/{routing_key}")
async def subscribe(self, queue_name: str, handler: MessageHandler):
"""订阅消息"""
queue = self.queues.get(queue_name)
if not queue:
raise ValueError(f"Queue {queue_name} not found")
# 注册处理器
for message_type in handler.get_message_types():
if message_type not in self.handlers:
self.handlers[message_type] = []
self.handlers[message_type].append(handler)
# 开始消费消息
await queue.consume(self._message_callback)
logger.info(f"Subscribed to queue {queue_name}")
async def _message_callback(self, message: aio_pika.IncomingMessage):
"""消息回调处理"""
async with message.process():
try:
# 解析消息
message_data = json.loads(message.body.decode())
envelope = MessageEnvelope.from_dict(message_data)
logger.info(f"Received message {envelope.id} of type {envelope.type}")
# 查找处理器
handlers = self.handlers.get(envelope.type, [])
if not handlers:
logger.warning(f"No handlers for message type {envelope.type}")
return
# 处理消息
for handler in handlers:
try:
response = await handler.handle(envelope)
# 如果有响应且指定了回复地址
if response and envelope.reply_to:
await self._send_reply(response, envelope.reply_to)
except Exception as e:
logger.error(f"Handler {handler.__class__.__name__} failed: {e}")
# 可以在这里实现重试逻辑
except Exception as e:
logger.error(f"Failed to process message: {e}")
# 消息处理失败,可以发送到死信队列
async def _send_reply(self, response: MessageEnvelope, reply_to: str):
"""发送回复消息"""
try:
# 假设reply_to格式为 "exchange/routing_key"
parts = reply_to.split('/', 1)
if len(parts) == 2:
exchange_name, routing_key = parts
await self.publish(exchange_name, routing_key, response)
else:
logger.error(f"Invalid reply_to format: {reply_to}")
except Exception as e:
logger.error(f"Failed to send reply: {e}")
# 具体的消息处理器实现
class UserEventHandler(MessageHandler):
"""用户事件处理器"""
def __init__(self, service_name: str):
self.service_name = service_name
async def handle(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
"""处理用户事件"""
logger.info(f"[{self.service_name}] Processing {envelope.type} event")
if envelope.type == "user.created":
return await self._handle_user_created(envelope)
elif envelope.type == "user.updated":
return await self._handle_user_updated(envelope)
elif envelope.type == "user.deleted":
return await self._handle_user_deleted(envelope)
else:
logger.warning(f"Unknown event type: {envelope.type}")
return None
def get_message_types(self) -> list[str]:
return ["user.created", "user.updated", "user.deleted"]
async def _handle_user_created(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
"""处理用户创建事件"""
user_data = envelope.data
user_id = user_data.get("id")
username = user_data.get("username")
email = user_data.get("email")
logger.info(f"[{self.service_name}] User created: {username} ({email})")
# 模拟业务处理
await asyncio.sleep(0.1)
# 发送欢迎邮件事件
welcome_event = MessageEnvelope(
id=f"welcome_{user_id}_{datetime.now().isoformat()}",
type="email.welcome",
source=self.service_name,
timestamp=datetime.now().isoformat(),
data={
"user_id": user_id,
"username": username,
"email": email,
"template": "welcome"
}
)
return welcome_event
async def _handle_user_updated(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
"""处理用户更新事件"""
user_data = envelope.data
user_id = user_data.get("id")
logger.info(f"[{self.service_name}] User updated: {user_id}")
# 模拟业务处理
await asyncio.sleep(0.1)
return None
async def _handle_user_deleted(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
"""处理用户删除事件"""
user_data = envelope.data
user_id = user_data.get("id")
logger.info(f"[{self.service_name}] User deleted: {user_id}")
# 模拟清理相关数据
await asyncio.sleep(0.1)
return None
class EmailHandler(MessageHandler):
"""邮件处理器"""
def __init__(self, service_name: str):
self.service_name = service_name
async def handle(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
"""处理邮件事件"""
logger.info(f"[{self.service_name}] Processing {envelope.type} event")
if envelope.type == "email.welcome":
return await self._send_welcome_email(envelope)
elif envelope.type == "email.notification":
return await self._send_notification_email(envelope)
else:
logger.warning(f"Unknown email type: {envelope.type}")
return None
def get_message_types(self) -> list[str]:
return ["email.welcome", "email.notification"]
async def _send_welcome_email(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
"""发送欢迎邮件"""
email_data = envelope.data
username = email_data.get("username")
email = email_data.get("email")
logger.info(f"[{self.service_name}] Sending welcome email to {username} ({email})")
# 模拟发送邮件
await asyncio.sleep(0.2)
logger.info(f"[{self.service_name}] Welcome email sent successfully")
return None
async def _send_notification_email(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
"""发送通知邮件"""
email_data = envelope.data
subject = email_data.get("subject")
recipient = email_data.get("recipient")
logger.info(f"[{self.service_name}] Sending notification '{subject}' to {recipient}")
# 模拟发送邮件
await asyncio.sleep(0.2)
logger.info(f"[{self.service_name}] Notification email sent successfully")
return None
# 使用示例
async def main():
# 创建消息总线
message_bus = MessageBus("amqp://guest:guest@localhost/")
try:
# 连接到消息代理
await message_bus.connect()
# 声明交换机和队列
await message_bus.declare_exchange("user.events", "topic")
await message_bus.declare_exchange("email.events", "topic")
await message_bus.declare_queue("user.service.queue")
await message_bus.declare_queue("email.service.queue")
# 绑定队列
await message_bus.bind_queue("user.service.queue", "user.events", "user.*")
await message_bus.bind_queue("email.service.queue", "email.events", "email.*")
# 创建处理器
user_handler = UserEventHandler("user-service")
email_handler = EmailHandler("email-service")
# 订阅消息
await message_bus.subscribe("user.service.queue", user_handler)
await message_bus.subscribe("email.service.queue", email_handler)
# 等待一段时间让订阅生效
await asyncio.sleep(1)
# 发布用户创建事件
user_created_event = MessageEnvelope(
id=f"user_created_{datetime.now().isoformat()}",
type="user.created",
source="user-api",
timestamp=datetime.now().isoformat(),
data={
"id": "user123",
"username": "john_doe",
"email": "john@example.com",
"status": "active"
}
)
await message_bus.publish("user.events", "user.created", user_created_event)
# 等待消息处理
await asyncio.sleep(2)
# 发布用户更新事件
user_updated_event = MessageEnvelope(
id=f"user_updated_{datetime.now().isoformat()}",
type="user.updated",
source="user-api",
timestamp=datetime.now().isoformat(),
data={
"id": "user123",
"username": "john_smith",
"email": "john.smith@example.com",
"status": "active"
}
)
await message_bus.publish("user.events", "user.updated", user_updated_event)
# 等待消息处理
await asyncio.sleep(2)
logger.info("Message processing completed")
except Exception as e:
logger.error(f"Error in main: {e}")
finally:
await message_bus.disconnect()
if __name__ == "__main__":
asyncio.run(main())
事件流处理
// 基于Apache Kafka的事件流处理
const { Kafka, logLevel } = require('kafkajs');
const { v4: uuidv4 } = require('uuid');
// Kafka配置
const kafka = new Kafka({
clientId: 'microservice-app',
brokers: ['localhost:9092'],
logLevel: logLevel.INFO,
retry: {
initialRetryTime: 100,
retries: 8
}
});
// 事件基类
class Event {
constructor(type, source, data, metadata = {}) {
this.id = uuidv4();
this.type = type;
this.source = source;
this.timestamp = new Date().toISOString();
this.data = data;
this.metadata = metadata;
}
toJSON() {
return {
id: this.id,
type: this.type,
source: this.source,
timestamp: this.timestamp,
data: this.data,
metadata: this.metadata
};
}
static fromJSON(json) {
const event = new Event(json.type, json.source, json.data, json.metadata);
event.id = json.id;
event.timestamp = json.timestamp;
return event;
}
}
// 事件发布器
class EventPublisher {
constructor(kafka) {
this.producer = kafka.producer({
maxInFlightRequests: 1,
idempotent: true,
transactionTimeout: 30000
});
this.connected = false;
}
async connect() {
if (!this.connected) {
await this.producer.connect();
this.connected = true;
console.log('Event publisher connected');
}
}
async disconnect() {
if (this.connected) {
await this.producer.disconnect();
this.connected = false;
console.log('Event publisher disconnected');
}
}
async publish(topic, event, partition = null) {
if (!this.connected) {
throw new Error('Publisher not connected');
}
const message = {
key: event.id,
value: JSON.stringify(event.toJSON()),
headers: {
'event-type': event.type,
'event-source': event.source,
'event-timestamp': event.timestamp
}
};
if (partition !== null) {
message.partition = partition;
}
try {
const result = await this.producer.send({
topic,
messages: [message]
});
console.log(`Published event ${event.id} to topic ${topic}:`, result);
return result;
} catch (error) {
console.error(`Failed to publish event ${event.id}:`, error);
throw error;
}
}
async publishBatch(topic, events) {
if (!this.connected) {
throw new Error('Publisher not connected');
}
const messages = events.map(event => ({
key: event.id,
value: JSON.stringify(event.toJSON()),
headers: {
'event-type': event.type,
'event-source': event.source,
'event-timestamp': event.timestamp
}
}));
try {
const result = await this.producer.send({
topic,
messages
});
console.log(`Published ${events.length} events to topic ${topic}`);
return result;
} catch (error) {
console.error(`Failed to publish batch events:`, error);
throw error;
}
}
}
// 事件处理器接口
class EventHandler {
constructor(name) {
this.name = name;
}
async handle(event) {
throw new Error('handle method must be implemented');
}
getEventTypes() {
throw new Error('getEventTypes method must be implemented');
}
async onError(error, event) {
console.error(`Handler ${this.name} error:`, error);
}
}
// 事件消费者
class EventConsumer {
constructor(kafka, groupId) {
this.consumer = kafka.consumer({
groupId,
sessionTimeout: 30000,
rebalanceTimeout: 60000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576,
minBytes: 1,
maxBytes: 10485760,
maxWaitTimeInMs: 5000
});
this.handlers = new Map();
this.connected = false;
this.running = false;
}
async connect() {
if (!this.connected) {
await this.consumer.connect();
this.connected = true;
console.log('Event consumer connected');
}
}
async disconnect() {
if (this.connected) {
this.running = false;
await this.consumer.disconnect();
this.connected = false;
console.log('Event consumer disconnected');
}
}
addHandler(handler) {
const eventTypes = handler.getEventTypes();
eventTypes.forEach(type => {
if (!this.handlers.has(type)) {
this.handlers.set(type, []);
}
this.handlers.get(type).push(handler);
});
console.log(`Added handler ${handler.name} for events: ${eventTypes.join(', ')}`);
}
async subscribe(topics) {
if (!this.connected) {
throw new Error('Consumer not connected');
}
await this.consumer.subscribe({
topics: Array.isArray(topics) ? topics : [topics],
fromBeginning: false
});
console.log(`Subscribed to topics: ${Array.isArray(topics) ? topics.join(', ') : topics}`);
}
async start() {
if (!this.connected) {
throw new Error('Consumer not connected');
}
this.running = true;
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!this.running) return;
try {
const eventData = JSON.parse(message.value.toString());
const event = Event.fromJSON(eventData);
console.log(`Received event ${event.id} of type ${event.type} from topic ${topic}`);
// 查找处理器
const handlers = this.handlers.get(event.type) || [];
if (handlers.length === 0) {
console.warn(`No handlers for event type ${event.type}`);
return;
}
// 并发处理事件
const promises = handlers.map(async handler => {
try {
await handler.handle(event);
} catch (error) {
await handler.onError(error, event);
}
});
await Promise.all(promises);
} catch (error) {
console.error('Failed to process message:', error);
}
}
});
console.log('Event consumer started');
}
async stop() {
this.running = false;
await this.consumer.stop();
console.log('Event consumer stopped');
}
}
// 具体的事件处理器实现
class UserEventHandler extends EventHandler {
constructor() {
super('UserEventHandler');
}
getEventTypes() {
return ['user.created', 'user.updated', 'user.deleted'];
}
async handle(event) {
console.log(`[${this.name}] Processing ${event.type} event`);
switch (event.type) {
case 'user.created':
await this.handleUserCreated(event);
break;
case 'user.updated':
await this.handleUserUpdated(event);
break;
case 'user.deleted':
await this.handleUserDeleted(event);
break;
default:
console.warn(`Unknown event type: ${event.type}`);
}
}
async handleUserCreated(event) {
const { id, username, email } = event.data;
console.log(`[${this.name}] User created: ${username} (${email})`);
// 模拟业务处理
await new Promise(resolve => setTimeout(resolve, 100));
// 发布欢迎邮件事件
const publisher = new EventPublisher(kafka);
await publisher.connect();
const welcomeEvent = new Event(
'email.welcome',
'user-service',
{
userId: id,
username,
email,
template: 'welcome'
}
);
await publisher.publish('email-events', welcomeEvent);
await publisher.disconnect();
}
async handleUserUpdated(event) {
const { id } = event.data;
console.log(`[${this.name}] User updated: ${id}`);
// 模拟业务处理
await new Promise(resolve => setTimeout(resolve, 100));
}
async handleUserDeleted(event) {
const { id } = event.data;
console.log(`[${this.name}] User deleted: ${id}`);
// 模拟清理相关数据
await new Promise(resolve => setTimeout(resolve, 100));
}
}
class EmailEventHandler extends EventHandler {
constructor() {
super('EmailEventHandler');
}
getEventTypes() {
return ['email.welcome', 'email.notification'];
}
async handle(event) {
console.log(`[${this.name}] Processing ${event.type} event`);
switch (event.type) {
case 'email.welcome':
await this.sendWelcomeEmail(event);
break;
case 'email.notification':
await this.sendNotificationEmail(event);
break;
default:
console.warn(`Unknown event type: ${event.type}`);
}
}
async sendWelcomeEmail(event) {
const { username, email } = event.data;
console.log(`[${this.name}] Sending welcome email to ${username} (${email})`);
// 模拟发送邮件
await new Promise(resolve => setTimeout(resolve, 200));
console.log(`[${this.name}] Welcome email sent successfully`);
}
async sendNotificationEmail(event) {
const { subject, recipient, content } = event.data;
console.log(`[${this.name}] Sending notification '${subject}' to ${recipient}`);
// 模拟发送邮件
await new Promise(resolve => setTimeout(resolve, 200));
console.log(`[${this.name}] Notification email sent successfully`);
}
}
// 事件流处理器
class EventStreamProcessor {
constructor(kafka) {
this.kafka = kafka;
this.publisher = new EventPublisher(kafka);
this.consumers = new Map();
}
async start() {
await this.publisher.connect();
// 启动用户事件消费者
const userConsumer = new EventConsumer(this.kafka, 'user-service-group');
await userConsumer.connect();
await userConsumer.subscribe(['user-events']);
userConsumer.addHandler(new UserEventHandler());
await userConsumer.start();
this.consumers.set('user', userConsumer);
// 启动邮件事件消费者
const emailConsumer = new EventConsumer(this.kafka, 'email-service-group');
await emailConsumer.connect();
await emailConsumer.subscribe(['email-events']);
emailConsumer.addHandler(new EmailEventHandler());
await emailConsumer.start();
this.consumers.set('email', emailConsumer);
console.log('Event stream processor started');
}
async stop() {
for (const [name, consumer] of this.consumers) {
await consumer.stop();
await consumer.disconnect();
}
await this.publisher.disconnect();
console.log('Event stream processor stopped');
}
async publishUserEvent(type, userData) {
const event = new Event(type, 'user-api', userData);
await this.publisher.publish('user-events', event);
return event;
}
}
// 使用示例
async function main() {
const processor = new EventStreamProcessor(kafka);
try {
await processor.start();
// 等待消费者启动
await new Promise(resolve => setTimeout(resolve, 2000));
// 发布用户创建事件
await processor.publishUserEvent('user.created', {
id: 'user123',
username: 'john_doe',
email: 'john@example.com',
status: 'active'
});
// 等待事件处理
await new Promise(resolve => setTimeout(resolve, 3000));
// 发布用户更新事件
await processor.publishUserEvent('user.updated', {
id: 'user123',
username: 'john_smith',
email: 'john.smith@example.com',
status: 'active'
});
// 等待事件处理
await new Promise(resolve => setTimeout(resolve, 3000));
console.log('Event processing completed');
} catch (error) {
console.error('Error in main:', error);
} finally {
await processor.stop();
}
}
if (require.main === module) {
main().catch(console.error);
}
module.exports = {
Event,
EventPublisher,
EventHandler,
EventConsumer,
EventStreamProcessor,
UserEventHandler,
EmailEventHandler
};
3.3 服务发现与负载均衡
服务注册与发现
# Consul服务发现配置
apiVersion: v1
kind: ConfigMap
metadata:
name: consul-config
namespace: microservices
data:
consul.json: |
{
"datacenter": "dc1",
"data_dir": "/consul/data",
"log_level": "INFO",
"server": true,
"bootstrap_expect": 3,
"bind_addr": "0.0.0.0",
"client_addr": "0.0.0.0",
"retry_join": [
"consul-0.consul.microservices.svc.cluster.local",
"consul-1.consul.microservices.svc.cluster.local",
"consul-2.consul.microservices.svc.cluster.local"
],
"ui_config": {
"enabled": true
},
"connect": {
"enabled": true
},
"ports": {
"grpc": 8502
},
"acl": {
"enabled": true,
"default_policy": "allow",
"enable_token_persistence": true
}
}
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: consul
namespace: microservices
spec:
serviceName: consul
replicas: 3
selector:
matchLabels:
app: consul
template:
metadata:
labels:
app: consul
spec:
containers:
- name: consul
image: consul:1.15.2
ports:
- containerPort: 8500
name: ui-port
- containerPort: 8400
name: alt-port
- containerPort: 53
name: udp-port
- containerPort: 8443
name: https-port
- containerPort: 8080
name: http-port
- containerPort: 8301
name: serflan
- containerPort: 8302
name: serfwan
- containerPort: 8600
name: consuldns
- containerPort: 8300
name: server
command:
- "/bin/sh"
- "-ec"
- |
CONSUL_DATA_DIR=/consul/data
CONSUL_CONFIG_DIR=/consul/config
mkdir -p ${CONSUL_DATA_DIR}
mkdir -p ${CONSUL_CONFIG_DIR}
cp /etc/consul/consul.json ${CONSUL_CONFIG_DIR}/
exec /usr/local/bin/docker-entrypoint.sh consul agent -config-dir=${CONSUL_CONFIG_DIR}
volumeMounts:
- name: consul-config
mountPath: /etc/consul
- name: consul-data
mountPath: /consul/data
env:
- name: CONSUL_BIND_INTERFACE
value: eth0
- name: CONSUL_CLIENT_INTERFACE
value: eth0
volumes:
- name: consul-config
configMap:
name: consul-config
volumeClaimTemplates:
- metadata:
name: consul-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
---
apiVersion: v1
kind: Service
metadata:
name: consul
namespace: microservices
labels:
app: consul
spec:
clusterIP: None
ports:
- port: 8500
targetPort: 8500
name: ui
- port: 8400
targetPort: 8400
name: alt-port
- port: 53
targetPort: 53
name: udp-port
- port: 8443
targetPort: 8443
name: https-port
- port: 8080
targetPort: 8080
name: http-port
- port: 8301
targetPort: 8301
name: serflan-tcp
protocol: "TCP"
- port: 8301
targetPort: 8301
name: serflan-udp
protocol: "UDP"
- port: 8302
targetPort: 8302
name: serfwan-tcp
protocol: "TCP"
- port: 8302
targetPort: 8302
name: serfwan-udp
protocol: "UDP"
- port: 8300
targetPort: 8300
name: server
- port: 8600
targetPort: 8600
name: consuldns-tcp
protocol: "TCP"
- port: 8600
targetPort: 8600
name: consuldns-udp
protocol: "UDP"
selector:
app: consul
---
apiVersion: v1
kind: Service
metadata:
name: consul-ui
namespace: microservices
labels:
app: consul
spec:
type: LoadBalancer
ports:
- port: 80
targetPort: 8500
name: ui
selector:
app: consul
// Go语言服务发现客户端实现
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"sync"
"time"
"github.com/hashicorp/consul/api"
)
// ServiceInstance 服务实例
type ServiceInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Tags []string `json:"tags"`
Meta map[string]string `json:"meta"`
Health string `json:"health"`
}
// ServiceRegistry 服务注册接口
type ServiceRegistry interface {
Register(ctx context.Context, instance *ServiceInstance) error
Deregister(ctx context.Context, instanceID string) error
Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error)
HealthCheck(ctx context.Context, instanceID string) error
}
// ConsulRegistry Consul服务注册实现
type ConsulRegistry struct {
client *api.Client
config *api.Config
watchers map[string][]chan []*ServiceInstance
mutex sync.RWMutex
}
func NewConsulRegistry(address string) (*ConsulRegistry, error) {
config := api.DefaultConfig()
config.Address = address
client, err := api.NewClient(config)
if err != nil {
return nil, fmt.Errorf("failed to create consul client: %w", err)
}
return &ConsulRegistry{
client: client,
config: config,
watchers: make(map[string][]chan []*ServiceInstance),
}, nil
}
func (r *ConsulRegistry) Register(ctx context.Context, instance *ServiceInstance) error {
registration := &api.AgentServiceRegistration{
ID: instance.ID,
Name: instance.Name,
Address: instance.Address,
Port: instance.Port,
Tags: instance.Tags,
Meta: instance.Meta,
Check: &api.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/health", instance.Address, instance.Port),
Interval: "10s",
Timeout: "3s",
DeregisterCriticalServiceAfter: "30s",
},
}
err := r.client.Agent().ServiceRegister(registration)
if err != nil {
return fmt.Errorf("failed to register service: %w", err)
}
log.Printf("Service registered: %s (%s:%d)", instance.Name, instance.Address, instance.Port)
return nil
}
func (r *ConsulRegistry) Deregister(ctx context.Context, instanceID string) error {
err := r.client.Agent().ServiceDeregister(instanceID)
if err != nil {
return fmt.Errorf("failed to deregister service: %w", err)
}
log.Printf("Service deregistered: %s", instanceID)
return nil
}
func (r *ConsulRegistry) Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {
services, _, err := r.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, fmt.Errorf("failed to discover services: %w", err)
}
var instances []*ServiceInstance
for _, service := range services {
instance := &ServiceInstance{
ID: service.Service.ID,
Name: service.Service.Service,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
Meta: service.Service.Meta,
Health: "passing",
}
// 检查健康状态
for _, check := range service.Checks {
if check.Status != "passing" {
instance.Health = check.Status
break
}
}
instances = append(instances, instance)
}
return instances, nil
}
func (r *ConsulRegistry) Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error) {
ch := make(chan []*ServiceInstance, 10)
r.mutex.Lock()
if r.watchers[serviceName] == nil {
r.watchers[serviceName] = make([]chan []*ServiceInstance, 0)
}
r.watchers[serviceName] = append(r.watchers[serviceName], ch)
r.mutex.Unlock()
go r.watchService(ctx, serviceName)
return ch, nil
}
func (r *ConsulRegistry) watchService(ctx context.Context, serviceName string) {
queryOptions := &api.QueryOptions{
WaitTime: 30 * time.Second,
}
for {
select {
case <-ctx.Done():
return
default:
services, meta, err := r.client.Health().Service(serviceName, "", true, queryOptions)
if err != nil {
log.Printf("Watch error for service %s: %v", serviceName, err)
time.Sleep(5 * time.Second)
continue
}
queryOptions.WaitIndex = meta.LastIndex
var instances []*ServiceInstance
for _, service := range services {
instance := &ServiceInstance{
ID: service.Service.ID,
Name: service.Service.Service,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
Meta: service.Service.Meta,
Health: "passing",
}
for _, check := range service.Checks {
if check.Status != "passing" {
instance.Health = check.Status
break
}
}
instances = append(instances, instance)
}
// 通知所有观察者
r.mutex.RLock()
watchers := r.watchers[serviceName]
r.mutex.RUnlock()
for _, watcher := range watchers {
select {
case watcher <- instances:
default:
// 通道满了,跳过
}
}
}
}
}
func (r *ConsulRegistry) HealthCheck(ctx context.Context, instanceID string) error {
// 实现健康检查逻辑
return nil
}
// LoadBalancer 负载均衡器接口
type LoadBalancer interface {
Select(instances []*ServiceInstance) (*ServiceInstance, error)
UpdateInstances(instances []*ServiceInstance)
}
// RoundRobinBalancer 轮询负载均衡器
type RoundRobinBalancer struct {
instances []*ServiceInstance
current int
mutex sync.RWMutex
}
func NewRoundRobinBalancer() *RoundRobinBalancer {
return &RoundRobinBalancer{
instances: make([]*ServiceInstance, 0),
current: 0,
}
}
func (lb *RoundRobinBalancer) Select(instances []*ServiceInstance) (*ServiceInstance, error) {
lb.mutex.Lock()
defer lb.mutex.Unlock()
if len(instances) == 0 {
return nil, fmt.Errorf("no available instances")
}
// 过滤健康的实例
healthyInstances := make([]*ServiceInstance, 0)
for _, instance := range instances {
if instance.Health == "passing" {
healthyInstances = append(healthyInstances, instance)
}
}
if len(healthyInstances) == 0 {
return nil, fmt.Errorf("no healthy instances")
}
instance := healthyInstances[lb.current%len(healthyInstances)]
lb.current++
return instance, nil
}
func (lb *RoundRobinBalancer) UpdateInstances(instances []*ServiceInstance) {
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.instances = instances
if lb.current >= len(instances) {
lb.current = 0
}
}
// RandomBalancer 随机负载均衡器
type RandomBalancer struct {
rand *rand.Rand
}
func NewRandomBalancer() *RandomBalancer {
return &RandomBalancer{
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
func (lb *RandomBalancer) Select(instances []*ServiceInstance) (*ServiceInstance, error) {
if len(instances) == 0 {
return nil, fmt.Errorf("no available instances")
}
// 过滤健康的实例
healthyInstances := make([]*ServiceInstance, 0)
for _, instance := range instances {
if instance.Health == "passing" {
healthyInstances = append(healthyInstances, instance)
}
}
if len(healthyInstances) == 0 {
return nil, fmt.Errorf("no healthy instances")
}
index := lb.rand.Intn(len(healthyInstances))
return healthyInstances[index], nil
}
func (lb *RandomBalancer) UpdateInstances(instances []*ServiceInstance) {
// 随机负载均衡器不需要维护状态
}
// WeightedRoundRobinBalancer 加权轮询负载均衡器
type WeightedRoundRobinBalancer struct {
instances []*ServiceInstance
weights []int
current []int
mutex sync.RWMutex
}
func NewWeightedRoundRobinBalancer() *WeightedRoundRobinBalancer {
return &WeightedRoundRobinBalancer{
instances: make([]*ServiceInstance, 0),
weights: make([]int, 0),
current: make([]int, 0),
}
}
func (lb *WeightedRoundRobinBalancer) Select(instances []*ServiceInstance) (*ServiceInstance, error) {
lb.mutex.Lock()
defer lb.mutex.Unlock()
if len(instances) == 0 {
return nil, fmt.Errorf("no available instances")
}
// 过滤健康的实例并获取权重
healthyInstances := make([]*ServiceInstance, 0)
weights := make([]int, 0)
for _, instance := range instances {
if instance.Health == "passing" {
healthyInstances = append(healthyInstances, instance)
// 从meta中获取权重,默认为1
weight := 1
if weightStr, exists := instance.Meta["weight"]; exists {
if w, err := fmt.Sscanf(weightStr, "%d", &weight); err != nil || w != 1 {
weight = 1
}
}
weights = append(weights, weight)
}
}
if len(healthyInstances) == 0 {
return nil, fmt.Errorf("no healthy instances")
}
// 初始化当前权重
if len(lb.current) != len(healthyInstances) {
lb.current = make([]int, len(healthyInstances))
copy(lb.current, weights)
}
// 找到当前权重最大的实例
maxIndex := 0
for i := 1; i < len(lb.current); i++ {
if lb.current[i] > lb.current[maxIndex] {
maxIndex = i
}
}
// 减少选中实例的当前权重
totalWeight := 0
for _, w := range weights {
totalWeight += w
}
lb.current[maxIndex] -= totalWeight
// 增加所有实例的当前权重
for i := range lb.current {
lb.current[i] += weights[i]
}
return healthyInstances[maxIndex], nil
}
func (lb *WeightedRoundRobinBalancer) UpdateInstances(instances []*ServiceInstance) {
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.instances = instances
lb.weights = make([]int, len(instances))
lb.current = make([]int, len(instances))
for i, instance := range instances {
weight := 1
if weightStr, exists := instance.Meta["weight"]; exists {
if w, err := fmt.Sscanf(weightStr, "%d", &weight); err != nil || w != 1 {
weight = 1
}
}
lb.weights[i] = weight
lb.current[i] = weight
}
}
// ServiceClient 服务客户端
type ServiceClient struct {
registry ServiceRegistry
loadBalancer LoadBalancer
httpClient *http.Client
serviceName string
instances []*ServiceInstance
mutex sync.RWMutex
}
func NewServiceClient(registry ServiceRegistry, loadBalancer LoadBalancer, serviceName string) *ServiceClient {
return &ServiceClient{
registry: registry,
loadBalancer: loadBalancer,
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
serviceName: serviceName,
instances: make([]*ServiceInstance, 0),
}
}
func (c *ServiceClient) Start(ctx context.Context) error {
// 初始发现服务
instances, err := c.registry.Discover(ctx, c.serviceName)
if err != nil {
return fmt.Errorf("failed to discover service: %w", err)
}
c.updateInstances(instances)
// 监听服务变化
watcher, err := c.registry.Watch(ctx, c.serviceName)
if err != nil {
return fmt.Errorf("failed to watch service: %w", err)
}
go func() {
for {
select {
case instances := <-watcher:
c.updateInstances(instances)
case <-ctx.Done():
return
}
}
}()
return nil
}
func (c *ServiceClient) updateInstances(instances []*ServiceInstance) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.instances = instances
c.loadBalancer.UpdateInstances(instances)
log.Printf("Updated service instances for %s: %d instances", c.serviceName, len(instances))
}
func (c *ServiceClient) Call(ctx context.Context, method, path string, body interface{}) (*http.Response, error) {
c.mutex.RLock()
instances := c.instances
c.mutex.RUnlock()
instance, err := c.loadBalancer.Select(instances)
if err != nil {
return nil, fmt.Errorf("failed to select instance: %w", err)
}
url := fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
var reqBody []byte
if body != nil {
reqBody, err = json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err)
}
}
req, err := http.NewRequestWithContext(ctx, method, url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
if reqBody != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
return resp, nil
}
// 使用示例
func main() {
ctx := context.Background()
// 创建Consul注册中心
registry, err := NewConsulRegistry("localhost:8500")
if err != nil {
log.Fatalf("Failed to create registry: %v", err)
}
// 注册服务实例
instance := &ServiceInstance{
ID: "user-service-1",
Name: "user-service",
Address: "localhost",
Port: 8080,
Tags: []string{"api", "v1"},
Meta: map[string]string{
"version": "1.0.0",
"weight": "10",
},
}
err = registry.Register(ctx, instance)
if err != nil {
log.Fatalf("Failed to register service: %v", err)
}
// 创建服务客户端
loadBalancer := NewWeightedRoundRobinBalancer()
client := NewServiceClient(registry, loadBalancer, "user-service")
err = client.Start(ctx)
if err != nil {
log.Fatalf("Failed to start client: %v", err)
}
// 调用服务
resp, err := client.Call(ctx, "GET", "/users/123", nil)
if err != nil {
log.Printf("Service call failed: %v", err)
} else {
log.Printf("Service call successful: %d", resp.StatusCode)
resp.Body.Close()
}
// 清理
err = registry.Deregister(ctx, instance.ID)
if err != nil {
log.Printf("Failed to deregister service: %v", err)
}
}