学习目标
通过本章学习,你将掌握:
微服务架构基础
- 微服务设计原则
- 服务拆分策略
- 服务间通信
- 数据管理模式
容器化和Docker
- Docker容器化
- 多阶段构建
- 容器编排
- 镜像优化
Kubernetes部署
- K8s资源管理
- 服务发现
- 配置管理
- 健康检查
云原生模式
- 12-Factor应用
- 配置外部化
- 无状态设计
- 可观测性
服务网格和API网关
- Istio服务网格
- API Gateway模式
- 流量管理
- 安全策略
监控和可观测性
- 分布式追踪
- 指标收集
- 日志聚合
- 告警系统
23.1 微服务架构基础
微服务设计原则
## 23.3 Kubernetes部署
### Kubernetes资源配置
```yaml
# user-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
namespace: microservices
labels:
app: user-service
version: v1
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
version: v1
spec:
containers:
- name: user-service
image: user-service:latest
ports:
- containerPort: 8080
name: http
env:
- name: ASPNETCORE_ENVIRONMENT
value: "Production"
- name: ConnectionStrings__DefaultConnection
valueFrom:
secretKeyRef:
name: user-service-secrets
key: database-connection
- name: RabbitMQ__HostName
value: "rabbitmq-service"
- name: RabbitMQ__ServiceName
value: "user-service"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health/ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
securityContext:
runAsNonRoot: true
runAsUser: 1000
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
volumeMounts:
- name: tmp
mountPath: /tmp
- name: logs
mountPath: /app/logs
volumes:
- name: tmp
emptyDir: {}
- name: logs
emptyDir: {}
imagePullSecrets:
- name: registry-secret
---
apiVersion: v1
kind: Service
metadata:
name: user-service
namespace: microservices
labels:
app: user-service
spec:
selector:
app: user-service
ports:
- name: http
port: 80
targetPort: 8080
protocol: TCP
type: ClusterIP
---
apiVersion: v1
kind: Secret
metadata:
name: user-service-secrets
namespace: microservices
type: Opaque
data:
database-connection: U2VydmVyPXVzZXItZGI7RGF0YWJhc2U9VXNlclNlcnZpY2U7VXNlciBJZD1zYTtQYXNzd29yZD1Zb3VyUGFzc3dvcmQxMjM7VHJ1c3RTZXJ2ZXJDZXJ0aWZpY2F0ZT10cnVl
23.5 服务网格和API网关
Istio服务网格配置
# istio-gateway.yaml
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: microservices-gateway
namespace: microservices
spec:
selector:
istio: ingressgateway
servers:
- port:
number: 80
name: http
protocol: HTTP
hosts:
- "api.microservices.local"
- port:
number: 443
name: https
protocol: HTTPS
tls:
mode: SIMPLE
credentialName: microservices-tls
hosts:
- "api.microservices.local"
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: microservices-vs
namespace: microservices
spec:
hosts:
- "api.microservices.local"
gateways:
- microservices-gateway
http:
- match:
- uri:
prefix: "/api/users"
route:
- destination:
host: user-service
port:
number: 80
timeout: 30s
retries:
attempts: 3
perTryTimeout: 10s
- match:
- uri:
prefix: "/api/orders"
route:
- destination:
host: order-service
port:
number: 80
timeout: 30s
retries:
attempts: 3
perTryTimeout: 10s
- match:
- uri:
prefix: "/"
route:
- destination:
host: api-gateway
port:
number: 80
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service-dr
namespace: microservices
spec:
host: user-service
trafficPolicy:
loadBalancer:
simple: LEAST_CONN
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
maxRequestsPerConnection: 10
circuitBreaker:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
# security-policy.yaml
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: microservices
spec:
mtls:
mode: STRICT
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: user-service-authz
namespace: microservices
spec:
selector:
matchLabels:
app: user-service
rules:
- from:
- source:
principals: ["cluster.local/ns/microservices/sa/api-gateway"]
- to:
- operation:
methods: ["GET", "POST", "PUT", "DELETE"]
when:
- key: request.headers[authorization]
values: ["Bearer *"]
API网关实现
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.RateLimiting;
using System.Threading.RateLimiting;
using Yarp.ReverseProxy.Configuration;
// API网关配置
public class ApiGatewayConfiguration
{
public Dictionary<string, ServiceRoute> Routes { get; set; } = new();
public Dictionary<string, ServiceCluster> Clusters { get; set; } = new();
public RateLimitingConfiguration RateLimiting { get; set; } = new();
public AuthenticationConfiguration Authentication { get; set; } = new();
}
public class ServiceRoute
{
public string RouteId { get; set; }
public string ClusterId { get; set; }
public RouteMatch Match { get; set; }
public List<Transform> Transforms { get; set; } = new();
public AuthorizationPolicy AuthorizationPolicy { get; set; }
public RateLimitPolicy RateLimitPolicy { get; set; }
}
public class RouteMatch
{
public string Path { get; set; }
public List<string> Methods { get; set; } = new();
public Dictionary<string, string> Headers { get; set; } = new();
}
public class Transform
{
public string Type { get; set; }
public Dictionary<string, string> Parameters { get; set; } = new();
}
public class ServiceCluster
{
public string ClusterId { get; set; }
public LoadBalancingPolicy LoadBalancing { get; set; }
public Dictionary<string, ServiceDestination> Destinations { get; set; } = new();
public HealthCheckConfig HealthCheck { get; set; }
}
public class ServiceDestination
{
public string Address { get; set; }
public HealthStatus Health { get; set; }
public Dictionary<string, string> Metadata { get; set; } = new();
}
public class LoadBalancingPolicy
{
public string Mode { get; set; } = "RoundRobin"; // RoundRobin, LeastRequests, Random
}
public class HealthCheckConfig
{
public bool Enabled { get; set; } = true;
public string Path { get; set; } = "/health";
public TimeSpan Interval { get; set; } = TimeSpan.FromSeconds(30);
public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(5);
}
public enum HealthStatus
{
Unknown,
Healthy,
Unhealthy
}
// 动态路由配置提供者
public class DynamicRouteConfigProvider : IProxyConfigProvider
{
private readonly IServiceDiscovery _serviceDiscovery;
private readonly IConfigurationManager _configurationManager;
private readonly ILogger<DynamicRouteConfigProvider> _logger;
private volatile InMemoryConfigProvider _configProvider;
public DynamicRouteConfigProvider(IServiceDiscovery serviceDiscovery, IConfigurationManager configurationManager, ILogger<DynamicRouteConfigProvider> logger)
{
_serviceDiscovery = serviceDiscovery ?? throw new ArgumentNullException(nameof(serviceDiscovery));
_configurationManager = configurationManager ?? throw new ArgumentNullException(nameof(configurationManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_configProvider = new InMemoryConfigProvider(Array.Empty<RouteConfig>(), Array.Empty<ClusterConfig>());
}
public IProxyConfig GetConfig() => _configProvider.GetConfig();
public async Task UpdateConfigurationAsync()
{
try
{
var gatewayConfig = await _configurationManager.GetConfigurationAsync<ApiGatewayConfiguration>("api-gateway-config", "gateway.json");
if (gatewayConfig == null)
{
_logger.LogWarning("No gateway configuration found");
return;
}
var routes = new List<RouteConfig>();
var clusters = new List<ClusterConfig>();
// 构建路由配置
foreach (var route in gatewayConfig.Routes.Values)
{
var routeConfig = new RouteConfig
{
RouteId = route.RouteId,
ClusterId = route.ClusterId,
Match = new RouteMatch
{
Path = route.Match.Path,
Methods = route.Match.Methods
},
Transforms = route.Transforms.Select(t => new Dictionary<string, string>
{
["Type"] = t.Type
}.Concat(t.Parameters).ToDictionary(kvp => kvp.Key, kvp => kvp.Value)).ToList()
};
routes.Add(routeConfig);
}
// 构建集群配置
foreach (var cluster in gatewayConfig.Clusters.Values)
{
var destinations = new Dictionary<string, DestinationConfig>();
// 从服务发现获取健康的服务实例
var serviceEndpoints = await _serviceDiscovery.DiscoverServicesAsync(cluster.ClusterId);
foreach (var endpoint in serviceEndpoints)
{
destinations[endpoint.Host] = new DestinationConfig
{
Address = endpoint.BaseUrl
};
}
var clusterConfig = new ClusterConfig
{
ClusterId = cluster.ClusterId,
LoadBalancingPolicy = cluster.LoadBalancing.Mode,
Destinations = destinations,
HealthCheck = new HealthCheckConfig
{
Enabled = cluster.HealthCheck.Enabled,
Policy = new HealthCheckPolicy
{
Path = cluster.HealthCheck.Path,
Interval = cluster.HealthCheck.Interval,
Timeout = cluster.HealthCheck.Timeout
}
}
};
clusters.Add(clusterConfig);
}
_configProvider = new InMemoryConfigProvider(routes, clusters);
_logger.LogInformation("Updated gateway configuration with {RouteCount} routes and {ClusterCount} clusters", routes.Count, clusters.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating gateway configuration");
}
}
}
// 认证和授权中间件
public class ApiGatewayAuthenticationMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger<ApiGatewayAuthenticationMiddleware> _logger;
public ApiGatewayAuthenticationMiddleware(RequestDelegate next, ILogger<ApiGatewayAuthenticationMiddleware> logger)
{
_next = next ?? throw new ArgumentNullException(nameof(next));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task InvokeAsync(HttpContext context)
{
// 提取JWT令牌
var token = ExtractToken(context.Request);
if (!string.IsNullOrEmpty(token))
{
try
{
// 验证JWT令牌
var principal = ValidateToken(token);
context.User = principal;
// 添加用户信息到请求头
context.Request.Headers["X-User-Id"] = principal.FindFirst("sub")?.Value;
context.Request.Headers["X-User-Email"] = principal.FindFirst("email")?.Value;
context.Request.Headers["X-User-Roles"] = string.Join(",", principal.FindAll("role").Select(c => c.Value));
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Invalid JWT token");
context.Response.StatusCode = 401;
await context.Response.WriteAsync("Unauthorized");
return;
}
}
await _next(context);
}
private string ExtractToken(HttpRequest request)
{
var authHeader = request.Headers["Authorization"].FirstOrDefault();
if (authHeader?.StartsWith("Bearer ") == true)
{
return authHeader.Substring("Bearer ".Length).Trim();
}
return null;
}
private ClaimsPrincipal ValidateToken(string token)
{
// 这里应该实现JWT令牌验证逻辑
// 返回验证后的ClaimsPrincipal
throw new NotImplementedException("JWT token validation not implemented");
}
}
// 限流中间件
public class RateLimitingMiddleware
{
private readonly RequestDelegate _next;
private readonly RateLimiter _rateLimiter;
private readonly ILogger<RateLimitingMiddleware> _logger;
public RateLimitingMiddleware(RequestDelegate next, RateLimiter rateLimiter, ILogger<RateLimitingMiddleware> logger)
{
_next = next ?? throw new ArgumentNullException(nameof(next));
_rateLimiter = rateLimiter ?? throw new ArgumentNullException(nameof(rateLimiter));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task InvokeAsync(HttpContext context)
{
var clientId = GetClientId(context);
using var lease = await _rateLimiter.AcquireAsync(1, context.RequestAborted);
if (!lease.IsAcquired)
{
_logger.LogWarning("Rate limit exceeded for client {ClientId}", clientId);
context.Response.StatusCode = 429;
context.Response.Headers["Retry-After"] = "60";
await context.Response.WriteAsync("Rate limit exceeded");
return;
}
await _next(context);
}
private string GetClientId(HttpContext context)
{
// 从用户身份、IP地址或API密钥获取客户端ID
var userId = context.User?.FindFirst("sub")?.Value;
if (!string.IsNullOrEmpty(userId))
{
return $"user:{userId}";
}
var apiKey = context.Request.Headers["X-API-Key"].FirstOrDefault();
if (!string.IsNullOrEmpty(apiKey))
{
return $"api:{apiKey}";
}
return $"ip:{context.Connection.RemoteIpAddress}";
}
}
## 23.6 监控和可观测性
### 分布式追踪实现
```csharp
using OpenTelemetry;
using OpenTelemetry.Trace;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using System.Diagnostics;
using System.Diagnostics.Metrics;
// 分布式追踪配置
public static class TelemetryExtensions
{
public static IServiceCollection AddCustomTelemetry(this IServiceCollection services, IConfiguration configuration)
{
var serviceName = configuration["Monitoring:Tracing:ServiceName"] ?? "unknown-service";
var serviceVersion = configuration["Monitoring:Tracing:ServiceVersion"] ?? "1.0.0";
services.AddOpenTelemetry()
.WithTracing(builder =>
{
builder
.SetResourceBuilder(ResourceBuilder.CreateDefault()
.AddService(serviceName, serviceVersion)
.AddAttributes(new Dictionary<string, object>
{
["deployment.environment"] = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") ?? "unknown",
["service.instance.id"] = Environment.MachineName
}))
.AddAspNetCoreInstrumentation(options =>
{
options.RecordException = true;
options.EnrichWithHttpRequest = (activity, request) =>
{
activity.SetTag("http.request.body.size", request.ContentLength);
activity.SetTag("http.user_agent", request.Headers["User-Agent"].ToString());
};
options.EnrichWithHttpResponse = (activity, response) =>
{
activity.SetTag("http.response.body.size", response.ContentLength);
};
})
.AddHttpClientInstrumentation(options =>
{
options.RecordException = true;
options.EnrichWithHttpRequestMessage = (activity, request) =>
{
activity.SetTag("http.request.method", request.Method.ToString());
};
options.EnrichWithHttpResponseMessage = (activity, response) =>
{
activity.SetTag("http.response.status_code", (int)response.StatusCode);
};
})
.AddEntityFrameworkCoreInstrumentation(options =>
{
options.SetDbStatementForText = true;
options.SetDbStatementForStoredProcedure = true;
options.EnrichWithIDbCommand = (activity, command) =>
{
activity.SetTag("db.command.timeout", command.CommandTimeout);
};
})
.AddSource("UserService")
.AddSource("OrderService")
.AddJaegerExporter(options =>
{
options.Endpoint = new Uri(configuration["Monitoring:Tracing:JaegerEndpoint"] ?? "http://localhost:14268/api/traces");
});
})
.WithMetrics(builder =>
{
builder
.SetResourceBuilder(ResourceBuilder.CreateDefault()
.AddService(serviceName, serviceVersion))
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddRuntimeInstrumentation()
.AddProcessInstrumentation()
.AddMeter("UserService")
.AddMeter("OrderService")
.AddPrometheusExporter();
});
return services;
}
}
// 自定义指标收集器
public class CustomMetricsCollector
{
private readonly Meter _meter;
private readonly Counter<long> _requestCounter;
private readonly Histogram<double> _requestDuration;
private readonly Counter<long> _errorCounter;
private readonly Gauge<int> _activeConnections;
private readonly ILogger<CustomMetricsCollector> _logger;
public CustomMetricsCollector(ILogger<CustomMetricsCollector> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_meter = new Meter("UserService", "1.0.0");
_requestCounter = _meter.CreateCounter<long>(
"http_requests_total",
"requests",
"Total number of HTTP requests");
_requestDuration = _meter.CreateHistogram<double>(
"http_request_duration_seconds",
"seconds",
"Duration of HTTP requests");
_errorCounter = _meter.CreateCounter<long>(
"http_errors_total",
"errors",
"Total number of HTTP errors");
_activeConnections = _meter.CreateGauge<int>(
"http_active_connections",
"connections",
"Number of active HTTP connections");
}
public void RecordRequest(string method, string endpoint, int statusCode, double duration)
{
var tags = new KeyValuePair<string, object>[]
{
new("method", method),
new("endpoint", endpoint),
new("status_code", statusCode.ToString())
};
_requestCounter.Add(1, tags);
_requestDuration.Record(duration, tags);
if (statusCode >= 400)
{
_errorCounter.Add(1, tags);
}
}
public void UpdateActiveConnections(int count)
{
_activeConnections.Record(count);
}
}
// 结构化日志记录器
public class StructuredLogger
{
private readonly ILogger _logger;
private readonly ActivitySource _activitySource;
public StructuredLogger(ILogger<StructuredLogger> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_activitySource = new ActivitySource("UserService");
}
public void LogUserCreated(Guid userId, string email, TimeSpan duration)
{
using var activity = _activitySource.StartActivity("user.created");
activity?.SetTag("user.id", userId.ToString());
activity?.SetTag("user.email", email);
activity?.SetTag("operation.duration_ms", duration.TotalMilliseconds);
_logger.LogInformation(
"User created successfully. UserId: {UserId}, Email: {Email}, Duration: {Duration}ms",
userId, email, duration.TotalMilliseconds);
}
public void LogUserOperationFailed(Guid userId, string operation, Exception exception)
{
using var activity = _activitySource.StartActivity($"user.{operation}.failed");
activity?.SetTag("user.id", userId.ToString());
activity?.SetTag("operation", operation);
activity?.SetTag("error.type", exception.GetType().Name);
activity?.SetTag("error.message", exception.Message);
activity?.SetStatus(ActivityStatusCode.Error, exception.Message);
_logger.LogError(exception,
"User operation failed. UserId: {UserId}, Operation: {Operation}, Error: {ErrorType}",
userId, operation, exception.GetType().Name);
}
public void LogPerformanceMetric(string operation, TimeSpan duration, Dictionary<string, object> additionalData = null)
{
using var activity = _activitySource.StartActivity($"performance.{operation}");
activity?.SetTag("operation", operation);
activity?.SetTag("duration_ms", duration.TotalMilliseconds);
if (additionalData != null)
{
foreach (var kvp in additionalData)
{
activity?.SetTag(kvp.Key, kvp.Value?.ToString());
}
}
var logData = new Dictionary<string, object>
{
["Operation"] = operation,
["Duration"] = duration.TotalMilliseconds
};
if (additionalData != null)
{
foreach (var kvp in additionalData)
{
logData[kvp.Key] = kvp.Value;
}
}
_logger.LogInformation("Performance metric recorded: {LogData}", logData);
}
}
// 健康检查和监控中间件
public class MonitoringMiddleware
{
private readonly RequestDelegate _next;
private readonly CustomMetricsCollector _metricsCollector;
private readonly StructuredLogger _structuredLogger;
private readonly ILogger<MonitoringMiddleware> _logger;
public MonitoringMiddleware(RequestDelegate next, CustomMetricsCollector metricsCollector, StructuredLogger structuredLogger, ILogger<MonitoringMiddleware> logger)
{
_next = next ?? throw new ArgumentNullException(nameof(next));
_metricsCollector = metricsCollector ?? throw new ArgumentNullException(nameof(metricsCollector));
_structuredLogger = structuredLogger ?? throw new ArgumentNullException(nameof(structuredLogger));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task InvokeAsync(HttpContext context)
{
var stopwatch = Stopwatch.StartNew();
var requestId = Guid.NewGuid().ToString();
// 添加请求ID到响应头
context.Response.Headers["X-Request-Id"] = requestId;
// 添加追踪信息
using var activity = Activity.Current;
activity?.SetTag("http.request.id", requestId);
activity?.SetTag("http.request.path", context.Request.Path);
activity?.SetTag("http.request.method", context.Request.Method);
try
{
await _next(context);
}
catch (Exception ex)
{
_logger.LogError(ex, "Unhandled exception in request {RequestId}", requestId);
context.Response.StatusCode = 500;
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
finally
{
stopwatch.Stop();
// 记录指标
_metricsCollector.RecordRequest(
context.Request.Method,
context.Request.Path,
context.Response.StatusCode,
stopwatch.Elapsed.TotalSeconds);
// 记录性能日志
_structuredLogger.LogPerformanceMetric("http_request", stopwatch.Elapsed, new Dictionary<string, object>
{
["request_id"] = requestId,
["method"] = context.Request.Method,
["path"] = context.Request.Path.ToString(),
["status_code"] = context.Response.StatusCode,
["user_agent"] = context.Request.Headers["User-Agent"].ToString()
});
}
}
}
### 告警和监控配置
```yaml
# prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: monitoring
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
- job_name: 'kubernetes-apiservers'
kubernetes_sd_configs:
- role: endpoints
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
action: keep
regex: default;kubernetes;https
- job_name: 'kubernetes-nodes'
kubernetes_sd_configs:
- role: node
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- action: labelmap
regex: __meta_kubernetes_node_label_(.+)
- target_label: __address__
replacement: kubernetes.default.svc:443
- source_labels: [__meta_kubernetes_node_name]
regex: (.+)
target_label: __metrics_path__
replacement: /api/v1/nodes/${1}/proxy/metrics
- job_name: 'microservices'
kubernetes_sd_configs:
- role: endpoints
namespaces:
names:
- microservices
relabel_configs:
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
- action: labelmap
regex: __meta_kubernetes_service_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
action: replace
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_service_name]
action: replace
target_label: kubernetes_name
alert_rules.yml: |
groups:
- name: microservices.rules
rules:
- alert: HighErrorRate
expr: rate(http_errors_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors per second for {{ $labels.job }}"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "95th percentile latency is {{ $value }}s for {{ $labels.job }}"
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service is down"
description: "{{ $labels.job }} has been down for more than 1 minute"
- alert: HighMemoryUsage
expr: (process_resident_memory_bytes / 1024 / 1024) > 500
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value }}MB for {{ $labels.job }}"
- alert: HighCPUUsage
expr: rate(process_cpu_seconds_total[5m]) * 100 > 80
for: 5m
labels:
severity: warning
annotations:
summary: "High CPU usage"
description: "CPU usage is {{ $value }}% for {{ $labels.job }}"
// 告警管理器
public class AlertManager
{
private readonly ILogger<AlertManager> _logger;
private readonly IConfiguration _configuration;
private readonly HttpClient _httpClient;
public AlertManager(ILogger<AlertManager> logger, IConfiguration configuration, HttpClient httpClient)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
}
public async Task SendAlertAsync(Alert alert)
{
try
{
var alertPayload = new
{
alerts = new[]
{
new
{
labels = alert.Labels,
annotations = alert.Annotations,
startsAt = alert.StartsAt.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"),
endsAt = alert.EndsAt?.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"),
generatorURL = alert.GeneratorUrl
}
}
};
var json = JsonSerializer.Serialize(alertPayload);
var content = new StringContent(json, Encoding.UTF8, "application/json");
var alertmanagerUrl = _configuration["Monitoring:Alertmanager:Url"] ?? "http://localhost:9093";
var response = await _httpClient.PostAsync($"{alertmanagerUrl}/api/v1/alerts", content);
if (response.IsSuccessStatusCode)
{
_logger.LogInformation("Alert sent successfully: {AlertName}", alert.Labels.GetValueOrDefault("alertname"));
}
else
{
_logger.LogError("Failed to send alert: {StatusCode} - {Content}", response.StatusCode, await response.Content.ReadAsStringAsync());
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error sending alert: {AlertName}", alert.Labels.GetValueOrDefault("alertname"));
}
}
public async Task SendHealthCheckAlertAsync(string serviceName, string status, string description)
{
var alert = new Alert
{
Labels = new Dictionary<string, string>
{
["alertname"] = "HealthCheckFailed",
["service"] = serviceName,
["severity"] = status == "Unhealthy" ? "critical" : "warning"
},
Annotations = new Dictionary<string, string>
{
["summary"] = $"Health check failed for {serviceName}",
["description"] = description
},
StartsAt = DateTime.UtcNow,
GeneratorUrl = $"http://localhost/health/{serviceName}"
};
await SendAlertAsync(alert);
}
}
public class Alert
{
public Dictionary<string, string> Labels { get; set; } = new();
public Dictionary<string, string> Annotations { get; set; } = new();
public DateTime StartsAt { get; set; }
public DateTime? EndsAt { get; set; }
public string GeneratorUrl { get; set; }
}
// 性能监控服务
public class PerformanceMonitoringService
{
private readonly CustomMetricsCollector _metricsCollector;
private readonly AlertManager _alertManager;
private readonly ILogger<PerformanceMonitoringService> _logger;
private readonly Timer _monitoringTimer;
public PerformanceMonitoringService(CustomMetricsCollector metricsCollector, AlertManager alertManager, ILogger<PerformanceMonitoringService> logger)
{
_metricsCollector = metricsCollector ?? throw new ArgumentNullException(nameof(metricsCollector));
_alertManager = alertManager ?? throw new ArgumentNullException(nameof(alertManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_monitoringTimer = new Timer(MonitorPerformance, null, TimeSpan.Zero, TimeSpan.FromMinutes(1));
}
private async void MonitorPerformance(object state)
{
try
{
// 监控内存使用情况
var memoryUsage = GC.GetTotalMemory(false) / 1024 / 1024; // MB
if (memoryUsage > 500)
{
await _alertManager.SendAlertAsync(new Alert
{
Labels = new Dictionary<string, string>
{
["alertname"] = "HighMemoryUsage",
["service"] = Environment.GetEnvironmentVariable("SERVICE_NAME") ?? "unknown",
["severity"] = "warning"
},
Annotations = new Dictionary<string, string>
{
["summary"] = "High memory usage detected",
["description"] = $"Memory usage is {memoryUsage}MB"
},
StartsAt = DateTime.UtcNow
});
}
// 监控活动连接数
var activeConnections = GetActiveConnectionCount();
_metricsCollector.UpdateActiveConnections(activeConnections);
if (activeConnections > 1000)
{
await _alertManager.SendAlertAsync(new Alert
{
Labels = new Dictionary<string, string>
{
["alertname"] = "HighConnectionCount",
["service"] = Environment.GetEnvironmentVariable("SERVICE_NAME") ?? "unknown",
["severity"] = "warning"
},
Annotations = new Dictionary<string, string>
{
["summary"] = "High connection count detected",
["description"] = $"Active connections: {activeConnections}"
},
StartsAt = DateTime.UtcNow
});
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during performance monitoring");
}
}
private int GetActiveConnectionCount()
{
// 这里应该实现获取活动连接数的逻辑
// 可以通过性能计数器或其他方式获取
return Random.Shared.Next(50, 200);
}
public void Dispose()
{
_monitoringTimer?.Dispose();
}
}
23.7 实践练习
练习1:构建完整的微服务系统
// 微服务启动配置
public class MicroserviceStartup
{
public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
{
// 基础服务
services.AddControllers();
services.AddEndpointsApiExplorer();
services.AddSwaggerGen();
// 数据库
services.AddDbContext<ApplicationDbContext>(options =>
options.UseNpgsql(configuration.GetConnectionString("DefaultConnection")));
// 消息总线
services.Configure<RabbitMQOptions>(configuration.GetSection("RabbitMQ"));
services.AddSingleton<IMessageBus, RabbitMQMessageBus>();
// 服务发现
services.AddSingleton<IServiceDiscovery, KubernetesServiceDiscovery>();
services.AddSingleton<IConfigurationManager, KubernetesConfigurationManager>();
// 健康检查
services.AddHealthChecks()
.AddDbContextCheck<ApplicationDbContext>()
.AddCheck<ExternalServiceHealthCheck>("external-service")
.AddCheck<RabbitMQHealthCheck>("rabbitmq");
// 监控和追踪
services.AddCustomTelemetry(configuration);
services.AddSingleton<CustomMetricsCollector>();
services.AddSingleton<StructuredLogger>();
services.AddSingleton<AlertManager>();
services.AddSingleton<PerformanceMonitoringService>();
// API网关(仅在网关服务中)
if (configuration.GetValue<bool>("IsApiGateway"))
{
services.AddReverseProxy()
.LoadFromConfig(configuration.GetSection("ReverseProxy"));
services.AddSingleton<DynamicRouteConfigProvider>();
// 限流
services.AddRateLimiter(options =>
{
options.AddTokenBucketLimiter("api", limiterOptions =>
{
limiterOptions.TokenLimit = 100;
limiterOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
limiterOptions.QueueLimit = 50;
limiterOptions.ReplenishmentPeriod = TimeSpan.FromSeconds(1);
limiterOptions.TokensPerPeriod = 10;
});
});
}
// 认证和授权
services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(options =>
{
options.Authority = configuration["Authentication:Authority"];
options.Audience = configuration["Authentication:Audience"];
options.RequireHttpsMetadata = false; // 仅开发环境
});
services.AddAuthorization();
// 应用服务
services.AddScoped<IUserService, UserService>();
services.AddScoped<IOrderService, OrderService>();
services.AddScoped<IUserRepository, UserRepository>();
services.AddScoped<IOrderRepository, OrderRepository>();
services.AddScoped<IDomainEventDispatcher, DomainEventDispatcher>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
app.UseSwagger();
app.UseSwaggerUI();
}
// 中间件管道
app.UseMiddleware<MonitoringMiddleware>();
app.UseMiddleware<HealthCheckMiddleware>();
if (app.ApplicationServices.GetService<IConfiguration>().GetValue<bool>("IsApiGateway"))
{
app.UseMiddleware<ApiGatewayAuthenticationMiddleware>();
app.UseMiddleware<RateLimitingMiddleware>();
}
app.UseRouting();
app.UseAuthentication();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
endpoints.MapHealthChecks("/health");
if (app.ApplicationServices.GetService<IConfiguration>().GetValue<bool>("IsApiGateway"))
{
endpoints.MapReverseProxy();
}
});
}
}
// 用户服务控制器
[ApiController]
[Route("api/[controller]")]
[Authorize]
public class UsersController : ControllerBase
{
private readonly IUserService _userService;
private readonly StructuredLogger _logger;
public UsersController(IUserService userService, StructuredLogger logger)
{
_userService = userService ?? throw new ArgumentNullException(nameof(userService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
[HttpGet]
public async Task<ActionResult<IEnumerable<UserDto>>> GetUsers([FromQuery] int page = 1, [FromQuery] int size = 10)
{
var stopwatch = Stopwatch.StartNew();
try
{
var users = await _userService.GetUsersAsync(page, size);
stopwatch.Stop();
_logger.LogPerformanceMetric("get_users", stopwatch.Elapsed, new Dictionary<string, object>
{
["page"] = page,
["size"] = size,
["result_count"] = users.Count()
});
return Ok(users);
}
catch (Exception ex)
{
_logger.LogUserOperationFailed(Guid.Empty, "get_users", ex);
return StatusCode(500, "Internal server error");
}
}
[HttpPost]
public async Task<ActionResult<UserDto>> CreateUser([FromBody] CreateUserDto createUserDto)
{
var stopwatch = Stopwatch.StartNew();
try
{
var user = await _userService.CreateUserAsync(createUserDto);
stopwatch.Stop();
_logger.LogUserCreated(user.Id, user.Email, stopwatch.Elapsed);
return CreatedAtAction(nameof(GetUser), new { id = user.Id }, user);
}
catch (Exception ex)
{
_logger.LogUserOperationFailed(Guid.Empty, "create_user", ex);
return StatusCode(500, "Internal server error");
}
}
[HttpGet("{id}")]
public async Task<ActionResult<UserDto>> GetUser(Guid id)
{
var stopwatch = Stopwatch.StartNew();
try
{
var user = await _userService.GetUserByIdAsync(id);
if (user == null)
{
return NotFound();
}
stopwatch.Stop();
_logger.LogPerformanceMetric("get_user", stopwatch.Elapsed, new Dictionary<string, object>
{
["user_id"] = id
});
return Ok(user);
}
catch (Exception ex)
{
_logger.LogUserOperationFailed(id, "get_user", ex);
return StatusCode(500, "Internal server error");
}
}
}
练习2:部署配置优化
# production-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
namespace: microservices
labels:
app: user-service
version: v1
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: user-service
version: v1
template:
metadata:
labels:
app: user-service
version: v1
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
prometheus.io/path: "/metrics"
spec:
serviceAccountName: user-service
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 2000
containers:
- name: user-service
image: myregistry/user-service:v1.2.0
imagePullPolicy: Always
ports:
- containerPort: 8080
name: http
- containerPort: 8081
name: metrics
env:
- name: ASPNETCORE_ENVIRONMENT
value: "Production"
- name: SERVICE_NAME
value: "user-service"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
envFrom:
- configMapRef:
name: user-service-config
- secretRef:
name: user-service-secrets
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health/live
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health/ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
startupProbe:
httpGet:
path: /health/startup
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 10
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
volumeMounts:
- name: tmp
mountPath: /tmp
- name: logs
mountPath: /app/logs
volumes:
- name: tmp
emptyDir: {}
- name: logs
emptyDir: {}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- user-service
topologyKey: kubernetes.io/hostname
---
apiVersion: v1
kind: Service
metadata:
name: user-service
namespace: microservices
labels:
app: user-service
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
spec:
selector:
app: user-service
ports:
- name: http
port: 80
targetPort: 8080
protocol: TCP
- name: metrics
port: 8081
targetPort: 8081
protocol: TCP
type: ClusterIP
本章总结
通过本章学习,我们深入了解了云原生和微服务架构的核心概念和实践技术:
核心概念
微服务架构设计
- 领域驱动设计(DDD)
- 服务拆分策略
- 数据一致性管理
- 服务间通信模式
容器化技术
- Docker容器化最佳实践
- 多阶段构建优化
- 容器编排和管理
- 镜像安全和优化
Kubernetes部署
- 资源管理和配置
- 服务发现和负载均衡
- 配置管理和密钥管理
- 健康检查和自动恢复
云原生模式
- 12-Factor应用原则
- 配置外部化
- 无状态服务设计
- 可观测性实现
高级技术
服务网格
- Istio流量管理
- 安全策略配置
- 可观测性增强
- 故障注入和测试
API网关
- 动态路由配置
- 认证和授权
- 限流和熔断
- 监控和日志
监控和可观测性
- 分布式追踪
- 指标收集和告警
- 结构化日志
- 性能监控
实际应用
企业级微服务
- 大规模服务治理
- 多环境部署
- 持续集成/持续部署
- 灾难恢复
云原生应用
- 弹性伸缩
- 故障隔离
- 零停机部署
- 多云部署
DevOps实践
- 基础设施即代码
- 自动化运维
- 监控驱动开发
- 安全左移
重要技能
架构设计能力
- 微服务拆分
- 数据架构设计
- 通信模式选择
- 容错设计
运维自动化
- 容器编排
- 监控告警
- 日志分析
- 性能调优
安全意识
- 零信任架构
- 密钥管理
- 网络安全
- 合规要求
云原生和微服务架构是现代软件开发的重要趋势,掌握这些技术将帮助你构建可扩展、可维护、高可用的企业级应用系统。下一章我们将探讨C#在人工智能和机器学习领域的应用。
# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: user-service-config
namespace: microservices
data:
appsettings.json: |
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"RabbitMQ": {
"ExchangeName": "microservices.events",
"Port": 5672,
"UserName": "admin",
"VirtualHost": "/"
},
"HealthChecks": {
"UI": {
"HealthCheckDatabaseConnectionString": "Data Source=healthchecks.db"
}
}
}
# horizontal-pod-autoscaler.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: user-service-hpa
namespace: microservices
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: user-service
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 50
periodSeconds: 60
服务发现和配置管理
using Microsoft.Extensions.Options;
using k8s;
using k8s.Models;
// Kubernetes服务发现
public interface IServiceDiscovery
{
Task<IEnumerable<ServiceEndpoint>> DiscoverServicesAsync(string serviceName, CancellationToken cancellationToken = default);
Task<ServiceEndpoint> GetServiceEndpointAsync(string serviceName, CancellationToken cancellationToken = default);
}
public class KubernetesServiceDiscovery : IServiceDiscovery
{
private readonly IKubernetes _kubernetesClient;
private readonly KubernetesOptions _options;
private readonly ILogger<KubernetesServiceDiscovery> _logger;
public KubernetesServiceDiscovery(IKubernetes kubernetesClient, IOptions<KubernetesOptions> options, ILogger<KubernetesServiceDiscovery> logger)
{
_kubernetesClient = kubernetesClient ?? throw new ArgumentNullException(nameof(kubernetesClient));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<IEnumerable<ServiceEndpoint>> DiscoverServicesAsync(string serviceName, CancellationToken cancellationToken = default)
{
try
{
var service = await _kubernetesClient.ReadNamespacedServiceAsync(serviceName, _options.Namespace, cancellationToken: cancellationToken);
var endpoints = await _kubernetesClient.ReadNamespacedEndpointsAsync(serviceName, _options.Namespace, cancellationToken: cancellationToken);
var serviceEndpoints = new List<ServiceEndpoint>();
if (endpoints?.Subsets != null)
{
foreach (var subset in endpoints.Subsets)
{
if (subset.Addresses != null && subset.Ports != null)
{
foreach (var address in subset.Addresses)
{
foreach (var port in subset.Ports)
{
serviceEndpoints.Add(new ServiceEndpoint
{
ServiceName = serviceName,
Host = address.Ip,
Port = port.Port ?? 80,
Scheme = port.Name == "https" ? "https" : "http"
});
}
}
}
}
}
return serviceEndpoints;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error discovering services for {ServiceName}", serviceName);
throw;
}
}
public async Task<ServiceEndpoint> GetServiceEndpointAsync(string serviceName, CancellationToken cancellationToken = default)
{
var endpoints = await DiscoverServicesAsync(serviceName, cancellationToken);
return endpoints.FirstOrDefault() ?? throw new InvalidOperationException($"No endpoints found for service {serviceName}");
}
}
public class ServiceEndpoint
{
public string ServiceName { get; set; }
public string Host { get; set; }
public int Port { get; set; }
public string Scheme { get; set; } = "http";
public string BaseUrl => $"{Scheme}://{Host}:{Port}";
}
public class KubernetesOptions
{
public string Namespace { get; set; } = "default";
public string ConfigPath { get; set; }
public bool InCluster { get; set; } = true;
}
// 配置管理
public interface IConfigurationManager
{
Task<T> GetConfigurationAsync<T>(string configMapName, string key, CancellationToken cancellationToken = default) where T : class;
Task UpdateConfigurationAsync<T>(string configMapName, string key, T configuration, CancellationToken cancellationToken = default) where T : class;
Task<Dictionary<string, string>> GetAllConfigurationsAsync(string configMapName, CancellationToken cancellationToken = default);
}
public class KubernetesConfigurationManager : IConfigurationManager
{
private readonly IKubernetes _kubernetesClient;
private readonly KubernetesOptions _options;
private readonly ILogger<KubernetesConfigurationManager> _logger;
public KubernetesConfigurationManager(IKubernetes kubernetesClient, IOptions<KubernetesOptions> options, ILogger<KubernetesConfigurationManager> logger)
{
_kubernetesClient = kubernetesClient ?? throw new ArgumentNullException(nameof(kubernetesClient));
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<T> GetConfigurationAsync<T>(string configMapName, string key, CancellationToken cancellationToken = default) where T : class
{
try
{
var configMap = await _kubernetesClient.ReadNamespacedConfigMapAsync(configMapName, _options.Namespace, cancellationToken: cancellationToken);
if (configMap.Data?.TryGetValue(key, out var value) == true)
{
return JsonSerializer.Deserialize<T>(value, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
}
return null;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting configuration {Key} from ConfigMap {ConfigMapName}", key, configMapName);
throw;
}
}
public async Task UpdateConfigurationAsync<T>(string configMapName, string key, T configuration, CancellationToken cancellationToken = default) where T : class
{
try
{
var configMap = await _kubernetesClient.ReadNamespacedConfigMapAsync(configMapName, _options.Namespace, cancellationToken: cancellationToken);
var json = JsonSerializer.Serialize(configuration, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = true
});
configMap.Data ??= new Dictionary<string, string>();
configMap.Data[key] = json;
await _kubernetesClient.ReplaceNamespacedConfigMapAsync(configMap, configMapName, _options.Namespace, cancellationToken: cancellationToken);
_logger.LogInformation("Updated configuration {Key} in ConfigMap {ConfigMapName}", key, configMapName);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating configuration {Key} in ConfigMap {ConfigMapName}", key, configMapName);
throw;
}
}
public async Task<Dictionary<string, string>> GetAllConfigurationsAsync(string configMapName, CancellationToken cancellationToken = default)
{
try
{
var configMap = await _kubernetesClient.ReadNamespacedConfigMapAsync(configMapName, _options.Namespace, cancellationToken: cancellationToken);
return configMap.Data ?? new Dictionary<string, string>();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting all configurations from ConfigMap {ConfigMapName}", configMapName);
throw;
}
}
}
23.4 云原生模式
12-Factor应用实现
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
// 配置外部化
public class CloudNativeConfiguration
{
public DatabaseConfiguration Database { get; set; } = new();
public MessagingConfiguration Messaging { get; set; } = new();
public CacheConfiguration Cache { get; set; } = new();
public LoggingConfiguration Logging { get; set; } = new();
public SecurityConfiguration Security { get; set; } = new();
public MonitoringConfiguration Monitoring { get; set; } = new();
}
public class DatabaseConfiguration
{
public string ConnectionString { get; set; }
public int MaxRetryAttempts { get; set; } = 3;
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(2);
public int CommandTimeout { get; set; } = 30;
public bool EnableSensitiveDataLogging { get; set; } = false;
}
public class MessagingConfiguration
{
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string UserName { get; set; }
public string Password { get; set; }
public string VirtualHost { get; set; } = "/";
public string ExchangeName { get; set; } = "microservices.events";
public int MaxRetryAttempts { get; set; } = 3;
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(5);
}
public class CacheConfiguration
{
public string ConnectionString { get; set; }
public TimeSpan DefaultExpiration { get; set; } = TimeSpan.FromMinutes(30);
public int Database { get; set; } = 0;
public string KeyPrefix { get; set; }
}
public class LoggingConfiguration
{
public string Level { get; set; } = "Information";
public bool EnableStructuredLogging { get; set; } = true;
public string OutputTemplate { get; set; }
public ElasticsearchConfiguration Elasticsearch { get; set; } = new();
}
public class ElasticsearchConfiguration
{
public string Url { get; set; }
public string IndexFormat { get; set; } = "logs-{0:yyyy.MM.dd}";
public string Username { get; set; }
public string Password { get; set; }
}
public class SecurityConfiguration
{
public JwtConfiguration Jwt { get; set; } = new();
public CorsConfiguration Cors { get; set; } = new();
}
public class JwtConfiguration
{
public string SecretKey { get; set; }
public string Issuer { get; set; }
public string Audience { get; set; }
public TimeSpan ExpirationTime { get; set; } = TimeSpan.FromHours(1);
}
public class CorsConfiguration
{
public string[] AllowedOrigins { get; set; } = Array.Empty<string>();
public string[] AllowedMethods { get; set; } = { "GET", "POST", "PUT", "DELETE" };
public string[] AllowedHeaders { get; set; } = { "Content-Type", "Authorization" };
}
public class MonitoringConfiguration
{
public MetricsConfiguration Metrics { get; set; } = new();
public TracingConfiguration Tracing { get; set; } = new();
}
public class MetricsConfiguration
{
public string Endpoint { get; set; } = "/metrics";
public bool EnableHttpMetrics { get; set; } = true;
public bool EnableSystemMetrics { get; set; } = true;
}
public class TracingConfiguration
{
public string ServiceName { get; set; }
public string JaegerEndpoint { get; set; }
public double SamplingRate { get; set; } = 0.1;
}
// 环境感知配置加载器
public class EnvironmentAwareConfigurationLoader
{
private readonly IConfiguration _configuration;
private readonly IHostEnvironment _environment;
private readonly ILogger<EnvironmentAwareConfigurationLoader> _logger;
public EnvironmentAwareConfigurationLoader(IConfiguration configuration, IHostEnvironment environment, ILogger<EnvironmentAwareConfigurationLoader> logger)
{
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
_environment = environment ?? throw new ArgumentNullException(nameof(environment));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public CloudNativeConfiguration LoadConfiguration()
{
var config = new CloudNativeConfiguration();
// 绑定配置
_configuration.Bind(config);
// 环境特定的配置覆盖
if (_environment.IsDevelopment())
{
ApplyDevelopmentOverrides(config);
}
else if (_environment.IsProduction())
{
ApplyProductionOverrides(config);
}
// 验证配置
ValidateConfiguration(config);
_logger.LogInformation("Configuration loaded for environment: {Environment}", _environment.EnvironmentName);
return config;
}
private void ApplyDevelopmentOverrides(CloudNativeConfiguration config)
{
config.Database.EnableSensitiveDataLogging = true;
config.Logging.Level = "Debug";
config.Monitoring.Tracing.SamplingRate = 1.0; // 100% sampling in development
_logger.LogDebug("Applied development configuration overrides");
}
private void ApplyProductionOverrides(CloudNativeConfiguration config)
{
config.Database.EnableSensitiveDataLogging = false;
config.Logging.Level = "Information";
config.Monitoring.Tracing.SamplingRate = 0.01; // 1% sampling in production
_logger.LogDebug("Applied production configuration overrides");
}
private void ValidateConfiguration(CloudNativeConfiguration config)
{
var errors = new List<string>();
if (string.IsNullOrEmpty(config.Database.ConnectionString))
errors.Add("Database connection string is required");
if (string.IsNullOrEmpty(config.Security.Jwt.SecretKey))
errors.Add("JWT secret key is required");
if (string.IsNullOrEmpty(config.Monitoring.Tracing.ServiceName))
errors.Add("Service name is required for tracing");
if (errors.Any())
{
var errorMessage = string.Join("; ", errors);
_logger.LogError("Configuration validation failed: {Errors}", errorMessage);
throw new InvalidOperationException($"Configuration validation failed: {errorMessage}");
}
}
}
// 无状态服务设计
public abstract class StatelessService
{
protected readonly ILogger Logger;
protected readonly CloudNativeConfiguration Configuration;
protected StatelessService(ILogger logger, CloudNativeConfiguration configuration)
{
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
Configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
}
// 确保服务是无状态的
protected void EnsureStateless()
{
// 检查是否有任何实例字段存储状态
var fields = GetType().GetFields(BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public)
.Where(f => !f.IsInitOnly && !f.Name.StartsWith("_") && f.FieldType != typeof(ILogger) && !f.FieldType.IsAssignableFrom(typeof(CloudNativeConfiguration)))
.ToList();
if (fields.Any())
{
var fieldNames = string.Join(", ", fields.Select(f => f.Name));
Logger.LogWarning("Service {ServiceType} may not be stateless. Found mutable fields: {Fields}", GetType().Name, fieldNames);
}
}
}
// 可观测性实现
public class ObservabilityService
{
private readonly ILogger<ObservabilityService> _logger;
private readonly IMetrics _metrics;
private readonly ActivitySource _activitySource;
public ObservabilityService(ILogger<ObservabilityService> logger, IMetrics metrics)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_activitySource = new ActivitySource("UserService");
}
public Activity StartActivity(string operationName, ActivityKind kind = ActivityKind.Internal)
{
return _activitySource.StartActivity(operationName, kind);
}
public void RecordMetric(string name, double value, params KeyValuePair<string, object>[] tags)
{
_metrics.CreateHistogram<double>(name).Record(value, tags);
}
public void LogStructured(LogLevel level, string messageTemplate, params object[] args)
{
_logger.Log(level, messageTemplate, args);
}
public IDisposable BeginScope(string operationName, params KeyValuePair<string, object>[] properties)
{
var scopeProperties = new Dictionary<string, object> { ["Operation"] = operationName };
foreach (var property in properties)
{
scopeProperties[property.Key] = property.Value;
}
return _logger.BeginScope(scopeProperties);
}
}
// 优雅关闭处理
public class GracefulShutdownService : IHostedService
{
private readonly ILogger<GracefulShutdownService> _logger;
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly IServiceProvider _serviceProvider;
public GracefulShutdownService(ILogger<GracefulShutdownService> logger, IHostApplicationLifetime applicationLifetime, IServiceProvider serviceProvider)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_applicationLifetime = applicationLifetime ?? throw new ArgumentNullException(nameof(applicationLifetime));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
public Task StartAsync(CancellationToken cancellationToken)
{
_applicationLifetime.ApplicationStopping.Register(OnStopping);
_applicationLifetime.ApplicationStopped.Register(OnStopped);
_logger.LogInformation("Graceful shutdown service started");
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Graceful shutdown service stopped");
return Task.CompletedTask;
}
private void OnStopping()
{
_logger.LogInformation("Application is stopping, initiating graceful shutdown...");
try
{
// 停止接受新请求
// 完成正在处理的请求
// 关闭数据库连接
// 关闭消息队列连接
var disposableServices = _serviceProvider.GetServices<IDisposable>();
foreach (var service in disposableServices)
{
try
{
service.Dispose();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error disposing service {ServiceType}", service.GetType().Name);
}
}
_logger.LogInformation("Graceful shutdown completed");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during graceful shutdown");
}
}
private void OnStopped()
{
_logger.LogInformation("Application stopped");
}
}
```csharp
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.ComponentModel.DataAnnotations;
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
// 领域驱动设计 - 聚合根
public abstract class AggregateRoot<TId>
{
public TId Id { get; protected set; }
public DateTime CreatedAt { get; protected set; }
public DateTime UpdatedAt { get; protected set; }
private readonly List<IDomainEvent> _domainEvents = new();
public IReadOnlyList<IDomainEvent> DomainEvents => _domainEvents.AsReadOnly();
protected void AddDomainEvent(IDomainEvent domainEvent)
{
_domainEvents.Add(domainEvent);
}
public void ClearDomainEvents()
{
_domainEvents.Clear();
}
}
// 领域事件接口
public interface IDomainEvent
{
DateTime OccurredOn { get; }
string EventType { get; }
}
// 用户聚合
public class User : AggregateRoot<Guid>
{
public string Email { get; private set; }
public string FirstName { get; private set; }
public string LastName { get; private set; }
public UserStatus Status { get; private set; }
private User() { } // EF Core
public User(string email, string firstName, string lastName)
{
Id = Guid.NewGuid();
Email = email ?? throw new ArgumentNullException(nameof(email));
FirstName = firstName ?? throw new ArgumentNullException(nameof(firstName));
LastName = lastName ?? throw new ArgumentNullException(nameof(lastName));
Status = UserStatus.Active;
CreatedAt = DateTime.UtcNow;
UpdatedAt = DateTime.UtcNow;
AddDomainEvent(new UserCreatedEvent(Id, Email, FirstName, LastName));
}
public void UpdateProfile(string firstName, string lastName)
{
FirstName = firstName ?? throw new ArgumentNullException(nameof(firstName));
LastName = lastName ?? throw new ArgumentNullException(nameof(lastName));
UpdatedAt = DateTime.UtcNow;
AddDomainEvent(new UserProfileUpdatedEvent(Id, FirstName, LastName));
}
public void Deactivate()
{
if (Status == UserStatus.Inactive)
throw new InvalidOperationException("User is already inactive");
Status = UserStatus.Inactive;
UpdatedAt = DateTime.UtcNow;
AddDomainEvent(new UserDeactivatedEvent(Id));
}
}
public enum UserStatus
{
Active,
Inactive,
Suspended
}
// 领域事件实现
public record UserCreatedEvent(Guid UserId, string Email, string FirstName, string LastName) : IDomainEvent
{
public DateTime OccurredOn { get; } = DateTime.UtcNow;
public string EventType { get; } = nameof(UserCreatedEvent);
}
public record UserProfileUpdatedEvent(Guid UserId, string FirstName, string LastName) : IDomainEvent
{
public DateTime OccurredOn { get; } = DateTime.UtcNow;
public string EventType { get; } = nameof(UserProfileUpdatedEvent);
}
public record UserDeactivatedEvent(Guid UserId) : IDomainEvent
{
public DateTime OccurredOn { get; } = DateTime.UtcNow;
public string EventType { get; } = nameof(UserDeactivatedEvent);
}
// 仓储模式
public interface IUserRepository
{
Task<User> GetByIdAsync(Guid id, CancellationToken cancellationToken = default);
Task<User> GetByEmailAsync(string email, CancellationToken cancellationToken = default);
Task<IEnumerable<User>> GetAllAsync(CancellationToken cancellationToken = default);
Task AddAsync(User user, CancellationToken cancellationToken = default);
Task UpdateAsync(User user, CancellationToken cancellationToken = default);
Task DeleteAsync(Guid id, CancellationToken cancellationToken = default);
}
public class UserRepository : IUserRepository
{
private readonly UserDbContext _context;
public UserRepository(UserDbContext context)
{
_context = context ?? throw new ArgumentNullException(nameof(context));
}
public async Task<User> GetByIdAsync(Guid id, CancellationToken cancellationToken = default)
{
return await _context.Users.FindAsync(new object[] { id }, cancellationToken);
}
public async Task<User> GetByEmailAsync(string email, CancellationToken cancellationToken = default)
{
return await _context.Users
.FirstOrDefaultAsync(u => u.Email == email, cancellationToken);
}
public async Task<IEnumerable<User>> GetAllAsync(CancellationToken cancellationToken = default)
{
return await _context.Users.ToListAsync(cancellationToken);
}
public async Task AddAsync(User user, CancellationToken cancellationToken = default)
{
await _context.Users.AddAsync(user, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);
}
public async Task UpdateAsync(User user, CancellationToken cancellationToken = default)
{
_context.Users.Update(user);
await _context.SaveChangesAsync(cancellationToken);
}
public async Task DeleteAsync(Guid id, CancellationToken cancellationToken = default)
{
var user = await GetByIdAsync(id, cancellationToken);
if (user != null)
{
_context.Users.Remove(user);
await _context.SaveChangesAsync(cancellationToken);
}
}
}
// 应用服务
public interface IUserService
{
Task<UserDto> CreateUserAsync(CreateUserRequest request, CancellationToken cancellationToken = default);
Task<UserDto> GetUserAsync(Guid id, CancellationToken cancellationToken = default);
Task<UserDto> UpdateUserAsync(Guid id, UpdateUserRequest request, CancellationToken cancellationToken = default);
Task DeactivateUserAsync(Guid id, CancellationToken cancellationToken = default);
Task<IEnumerable<UserDto>> GetAllUsersAsync(CancellationToken cancellationToken = default);
}
public class UserService : IUserService
{
private readonly IUserRepository _userRepository;
private readonly IDomainEventDispatcher _eventDispatcher;
public UserService(IUserRepository userRepository, IDomainEventDispatcher eventDispatcher)
{
_userRepository = userRepository ?? throw new ArgumentNullException(nameof(userRepository));
_eventDispatcher = eventDispatcher ?? throw new ArgumentNullException(nameof(eventDispatcher));
}
public async Task<UserDto> CreateUserAsync(CreateUserRequest request, CancellationToken cancellationToken = default)
{
// 验证邮箱唯一性
var existingUser = await _userRepository.GetByEmailAsync(request.Email, cancellationToken);
if (existingUser != null)
{
throw new InvalidOperationException($"User with email {request.Email} already exists");
}
var user = new User(request.Email, request.FirstName, request.LastName);
await _userRepository.AddAsync(user, cancellationToken);
// 发布领域事件
await _eventDispatcher.DispatchAsync(user.DomainEvents, cancellationToken);
user.ClearDomainEvents();
return MapToDto(user);
}
public async Task<UserDto> GetUserAsync(Guid id, CancellationToken cancellationToken = default)
{
var user = await _userRepository.GetByIdAsync(id, cancellationToken);
if (user == null)
{
throw new NotFoundException($"User with id {id} not found");
}
return MapToDto(user);
}
public async Task<UserDto> UpdateUserAsync(Guid id, UpdateUserRequest request, CancellationToken cancellationToken = default)
{
var user = await _userRepository.GetByIdAsync(id, cancellationToken);
if (user == null)
{
throw new NotFoundException($"User with id {id} not found");
}
user.UpdateProfile(request.FirstName, request.LastName);
await _userRepository.UpdateAsync(user, cancellationToken);
// 发布领域事件
await _eventDispatcher.DispatchAsync(user.DomainEvents, cancellationToken);
user.ClearDomainEvents();
return MapToDto(user);
}
public async Task DeactivateUserAsync(Guid id, CancellationToken cancellationToken = default)
{
var user = await _userRepository.GetByIdAsync(id, cancellationToken);
if (user == null)
{
throw new NotFoundException($"User with id {id} not found");
}
user.Deactivate();
await _userRepository.UpdateAsync(user, cancellationToken);
// 发布领域事件
await _eventDispatcher.DispatchAsync(user.DomainEvents, cancellationToken);
user.ClearDomainEvents();
}
public async Task<IEnumerable<UserDto>> GetAllUsersAsync(CancellationToken cancellationToken = default)
{
var users = await _userRepository.GetAllAsync(cancellationToken);
return users.Select(MapToDto);
}
private static UserDto MapToDto(User user)
{
return new UserDto
{
Id = user.Id,
Email = user.Email,
FirstName = user.FirstName,
LastName = user.LastName,
Status = user.Status.ToString(),
CreatedAt = user.CreatedAt,
UpdatedAt = user.UpdatedAt
};
}
}
// DTOs
public class UserDto
{
public Guid Id { get; set; }
public string Email { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public string Status { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime UpdatedAt { get; set; }
}
public class CreateUserRequest
{
[Required]
[EmailAddress]
public string Email { get; set; }
[Required]
[StringLength(50, MinimumLength = 2)]
public string FirstName { get; set; }
[Required]
[StringLength(50, MinimumLength = 2)]
public string LastName { get; set; }
}
public class UpdateUserRequest
{
[Required]
[StringLength(50, MinimumLength = 2)]
public string FirstName { get; set; }
[Required]
[StringLength(50, MinimumLength = 2)]
public string LastName { get; set; }
}
// 异常类
public class NotFoundException : Exception
{
public NotFoundException(string message) : base(message) { }
}
// 领域事件调度器
public interface IDomainEventDispatcher
{
Task DispatchAsync(IEnumerable<IDomainEvent> events, CancellationToken cancellationToken = default);
}
public class DomainEventDispatcher : IDomainEventDispatcher
{
private readonly IServiceProvider _serviceProvider;
public DomainEventDispatcher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
public async Task DispatchAsync(IEnumerable<IDomainEvent> events, CancellationToken cancellationToken = default)
{
foreach (var domainEvent in events)
{
var handlerType = typeof(IDomainEventHandler<>).MakeGenericType(domainEvent.GetType());
var handlers = _serviceProvider.GetServices(handlerType);
foreach (var handler in handlers)
{
var method = handlerType.GetMethod("HandleAsync");
if (method != null)
{
await (Task)method.Invoke(handler, new object[] { domainEvent, cancellationToken });
}
}
}
}
}
// 领域事件处理器
public interface IDomainEventHandler<in TEvent> where TEvent : IDomainEvent
{
Task HandleAsync(TEvent domainEvent, CancellationToken cancellationToken = default);
}
public class UserCreatedEventHandler : IDomainEventHandler<UserCreatedEvent>
{
private readonly ILogger<UserCreatedEventHandler> _logger;
public UserCreatedEventHandler(ILogger<UserCreatedEventHandler> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task HandleAsync(UserCreatedEvent domainEvent, CancellationToken cancellationToken = default)
{
_logger.LogInformation("User created: {UserId} - {Email}", domainEvent.UserId, domainEvent.Email);
// 发送欢迎邮件、创建用户配置文件等
await Task.Delay(100, cancellationToken); // 模拟异步操作
}
}
// DbContext
public class UserDbContext : DbContext
{
public DbSet<User> Users { get; set; }
public UserDbContext(DbContextOptions<UserDbContext> options) : base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<User>(entity =>
{
entity.HasKey(e => e.Id);
entity.Property(e => e.Email).IsRequired().HasMaxLength(255);
entity.Property(e => e.FirstName).IsRequired().HasMaxLength(50);
entity.Property(e => e.LastName).IsRequired().HasMaxLength(50);
entity.Property(e => e.Status).HasConversion<string>();
entity.HasIndex(e => e.Email).IsUnique();
// 忽略领域事件
entity.Ignore(e => e.DomainEvents);
});
}
}
服务间通信
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
// 消息总线接口
public interface IMessageBus
{
Task PublishAsync<T>(T message, string routingKey = null, CancellationToken cancellationToken = default) where T : class;
Task SubscribeAsync<T>(Func<T, Task> handler, string queueName = null, CancellationToken cancellationToken = default) where T : class;
}
// RabbitMQ消息总线实现
public class RabbitMQMessageBus : IMessageBus, IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly RabbitMQOptions _options;
private readonly ILogger<RabbitMQMessageBus> _logger;
public RabbitMQMessageBus(IOptions<RabbitMQOptions> options, ILogger<RabbitMQMessageBus> logger)
{
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
var factory = new ConnectionFactory
{
HostName = _options.HostName,
Port = _options.Port,
UserName = _options.UserName,
Password = _options.Password,
VirtualHost = _options.VirtualHost
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
// 声明交换机
_channel.ExchangeDeclare(_options.ExchangeName, ExchangeType.Topic, durable: true);
}
public async Task PublishAsync<T>(T message, string routingKey = null, CancellationToken cancellationToken = default) where T : class
{
var messageType = typeof(T).Name;
routingKey ??= messageType.ToLowerInvariant();
var messageBody = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(messageBody);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = Guid.NewGuid().ToString();
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
properties.Type = messageType;
_channel.BasicPublish(
exchange: _options.ExchangeName,
routingKey: routingKey,
basicProperties: properties,
body: body);
_logger.LogInformation("Published message {MessageType} with routing key {RoutingKey}", messageType, routingKey);
await Task.CompletedTask;
}
public async Task SubscribeAsync<T>(Func<T, Task> handler, string queueName = null, CancellationToken cancellationToken = default) where T : class
{
var messageType = typeof(T).Name;
queueName ??= $"{_options.ServiceName}.{messageType}";
var routingKey = messageType.ToLowerInvariant();
// 声明队列
_channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
_channel.QueueBind(queueName, _options.ExchangeName, routingKey);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var messageBody = Encoding.UTF8.GetString(body);
var message = JsonSerializer.Deserialize<T>(messageBody);
await handler(message);
_channel.BasicAck(ea.DeliveryTag, false);
_logger.LogInformation("Processed message {MessageType} from queue {QueueName}", messageType, queueName);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message {MessageType} from queue {QueueName}", messageType, queueName);
_channel.BasicNack(ea.DeliveryTag, false, true); // 重新入队
}
};
_channel.BasicConsume(queueName, autoAck: false, consumer);
_logger.LogInformation("Started consuming messages from queue {QueueName}", queueName);
await Task.CompletedTask;
}
public void Dispose()
{
_channel?.Dispose();
_connection?.Dispose();
}
}
// RabbitMQ配置
public class RabbitMQOptions
{
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string UserName { get; set; } = "guest";
public string Password { get; set; } = "guest";
public string VirtualHost { get; set; } = "/";
public string ExchangeName { get; set; } = "microservices.events";
public string ServiceName { get; set; } = "unknown";
}
// HTTP客户端服务
public interface IOrderService
{
Task<OrderDto> GetOrderAsync(Guid orderId, CancellationToken cancellationToken = default);
Task<OrderDto> CreateOrderAsync(CreateOrderRequest request, CancellationToken cancellationToken = default);
}
public class OrderService : IOrderService
{
private readonly HttpClient _httpClient;
private readonly ILogger<OrderService> _logger;
public OrderService(HttpClient httpClient, ILogger<OrderService> logger)
{
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<OrderDto> GetOrderAsync(Guid orderId, CancellationToken cancellationToken = default)
{
try
{
var response = await _httpClient.GetAsync($"/api/orders/{orderId}", cancellationToken);
response.EnsureSuccessStatusCode();
var content = await response.Content.ReadAsStringAsync(cancellationToken);
var order = JsonSerializer.Deserialize<OrderDto>(content, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
return order;
}
catch (HttpRequestException ex)
{
_logger.LogError(ex, "Error getting order {OrderId}", orderId);
throw;
}
}
public async Task<OrderDto> CreateOrderAsync(CreateOrderRequest request, CancellationToken cancellationToken = default)
{
try
{
var json = JsonSerializer.Serialize(request, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
var content = new StringContent(json, Encoding.UTF8, "application/json");
var response = await _httpClient.PostAsync("/api/orders", content, cancellationToken);
response.EnsureSuccessStatusCode();
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken);
var order = JsonSerializer.Deserialize<OrderDto>(responseContent, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
return order;
}
catch (HttpRequestException ex)
{
_logger.LogError(ex, "Error creating order for user {UserId}", request.UserId);
throw;
}
}
}
// DTOs for inter-service communication
public class OrderDto
{
public Guid Id { get; set; }
public Guid UserId { get; set; }
public decimal TotalAmount { get; set; }
public string Status { get; set; }
public DateTime CreatedAt { get; set; }
public List<OrderItemDto> Items { get; set; } = new();
}
public class OrderItemDto
{
public Guid ProductId { get; set; }
public string ProductName { get; set; }
public int Quantity { get; set; }
public decimal UnitPrice { get; set; }
}
public class CreateOrderRequest
{
public Guid UserId { get; set; }
public List<CreateOrderItemRequest> Items { get; set; } = new();
}
public class CreateOrderItemRequest
{
public Guid ProductId { get; set; }
public int Quantity { get; set; }
}
// 集成事件
public record UserCreatedIntegrationEvent(Guid UserId, string Email, string FirstName, string LastName, DateTime CreatedAt);
public record OrderCreatedIntegrationEvent(Guid OrderId, Guid UserId, decimal TotalAmount, DateTime CreatedAt);
// 集成事件处理器
public class UserCreatedIntegrationEventHandler
{
private readonly IOrderService _orderService;
private readonly ILogger<UserCreatedIntegrationEventHandler> _logger;
public UserCreatedIntegrationEventHandler(IOrderService orderService, ILogger<UserCreatedIntegrationEventHandler> logger)
{
_orderService = orderService ?? throw new ArgumentNullException(nameof(orderService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task HandleAsync(UserCreatedIntegrationEvent integrationEvent)
{
_logger.LogInformation("Handling user created event for user {UserId}", integrationEvent.UserId);
// 为新用户创建欢迎订单或执行其他业务逻辑
// 这里只是示例,实际业务逻辑会更复杂
await Task.Delay(100); // 模拟处理时间
_logger.LogInformation("Completed handling user created event for user {UserId}", integrationEvent.UserId);
}
}
23.2 容器化和Docker
Dockerfile优化
# 多阶段构建 - 优化镜像大小
# 构建阶段
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
# 复制项目文件并还原依赖
COPY ["UserService/UserService.csproj", "UserService/"]
COPY ["UserService.Domain/UserService.Domain.csproj", "UserService.Domain/"]
COPY ["UserService.Infrastructure/UserService.Infrastructure.csproj", "UserService.Infrastructure/"]
COPY ["UserService.Application/UserService.Application.csproj", "UserService.Application/"]
RUN dotnet restore "UserService/UserService.csproj"
# 复制所有源代码
COPY . .
WORKDIR "/src/UserService"
# 构建应用
RUN dotnet build "UserService.csproj" -c Release -o /app/build
# 发布阶段
FROM build AS publish
RUN dotnet publish "UserService.csproj" -c Release -o /app/publish /p:UseAppHost=false
# 运行时阶段
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS final
WORKDIR /app
# 创建非root用户
RUN adduser --disabled-password --gecos '' appuser
# 复制发布的应用
COPY --from=publish /app/publish .
# 设置文件权限
RUN chown -R appuser:appuser /app
USER appuser
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# 暴露端口
EXPOSE 8080
# 设置环境变量
ENV ASPNETCORE_URLS=http://+:8080
ENV ASPNETCORE_ENVIRONMENT=Production
# 启动应用
ENTRYPOINT ["dotnet", "UserService.dll"]
Docker Compose配置
# docker-compose.yml
version: '3.8'
services:
# 用户服务
user-service:
build:
context: .
dockerfile: UserService/Dockerfile
ports:
- "5001:8080"
environment:
- ASPNETCORE_ENVIRONMENT=Development
- ConnectionStrings__DefaultConnection=Server=user-db;Database=UserService;User Id=sa;Password=YourPassword123;TrustServerCertificate=true
- RabbitMQ__HostName=rabbitmq
- RabbitMQ__ServiceName=user-service
depends_on:
- user-db
- rabbitmq
networks:
- microservices-network
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# 订单服务
order-service:
build:
context: .
dockerfile: OrderService/Dockerfile
ports:
- "5002:8080"
environment:
- ASPNETCORE_ENVIRONMENT=Development
- ConnectionStrings__DefaultConnection=Server=order-db;Database=OrderService;User Id=sa;Password=YourPassword123;TrustServerCertificate=true
- RabbitMQ__HostName=rabbitmq
- RabbitMQ__ServiceName=order-service
- Services__UserService=http://user-service:8080
depends_on:
- order-db
- rabbitmq
networks:
- microservices-network
restart: unless-stopped
# 用户数据库
user-db:
image: mcr.microsoft.com/mssql/server:2022-latest
environment:
- ACCEPT_EULA=Y
- SA_PASSWORD=YourPassword123
- MSSQL_PID=Express
ports:
- "1433:1433"
volumes:
- user-db-data:/var/opt/mssql
networks:
- microservices-network
restart: unless-stopped
# 订单数据库
order-db:
image: mcr.microsoft.com/mssql/server:2022-latest
environment:
- ACCEPT_EULA=Y
- SA_PASSWORD=YourPassword123
- MSSQL_PID=Express
ports:
- "1434:1433"
volumes:
- order-db-data:/var/opt/mssql
networks:
- microservices-network
restart: unless-stopped
# RabbitMQ消息队列
rabbitmq:
image: rabbitmq:3.12-management
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin123
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq-data:/var/lib/rabbitmq
networks:
- microservices-network
restart: unless-stopped
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
timeout: 10s
retries: 3
# Redis缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
networks:
- microservices-network
restart: unless-stopped
command: redis-server --appendonly yes
# API网关
api-gateway:
build:
context: .
dockerfile: ApiGateway/Dockerfile
ports:
- "5000:8080"
environment:
- ASPNETCORE_ENVIRONMENT=Development
- Services__UserService=http://user-service:8080
- Services__OrderService=http://order-service:8080
depends_on:
- user-service
- order-service
networks:
- microservices-network
restart: unless-stopped
volumes:
user-db-data:
order-db-data:
rabbitmq-data:
redis-data:
networks:
microservices-network:
driver: bridge
健康检查实现
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.EntityFrameworkCore;
// 健康检查配置
public static class HealthCheckExtensions
{
public static IServiceCollection AddCustomHealthChecks(this IServiceCollection services, IConfiguration configuration)
{
services.AddHealthChecks()
.AddCheck("self", () => HealthCheckResult.Healthy())
.AddDbContextCheck<UserDbContext>("database")
.AddRabbitMQ(configuration.GetConnectionString("RabbitMQ"), name: "rabbitmq")
.AddRedis(configuration.GetConnectionString("Redis"), name: "redis")
.AddUrlGroup(new Uri(configuration["Services:OrderService"] + "/health"), "order-service")
.AddCheck<ExternalServiceHealthCheck>("external-services");
return services;
}
}
// 自定义健康检查
public class ExternalServiceHealthCheck : IHealthCheck
{
private readonly IOrderService _orderService;
private readonly ILogger<ExternalServiceHealthCheck> _logger;
public ExternalServiceHealthCheck(IOrderService orderService, ILogger<ExternalServiceHealthCheck> logger)
{
_orderService = orderService ?? throw new ArgumentNullException(nameof(orderService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
try
{
// 检查外部服务连接
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
// 这里可以调用一个轻量级的健康检查端点
// 而不是实际的业务方法
await Task.Delay(100, cts.Token); // 模拟检查
return HealthCheckResult.Healthy("External services are responding");
}
catch (OperationCanceledException)
{
_logger.LogWarning("Health check timed out");
return HealthCheckResult.Degraded("External services are slow to respond");
}
catch (Exception ex)
{
_logger.LogError(ex, "Health check failed");
return HealthCheckResult.Unhealthy("External services are not responding", ex);
}
}
}
// 健康检查响应
public class HealthCheckResponse
{
public string Status { get; set; }
public TimeSpan TotalDuration { get; set; }
public Dictionary<string, HealthCheckEntry> Entries { get; set; } = new();
}
public class HealthCheckEntry
{
public string Status { get; set; }
public TimeSpan Duration { get; set; }
public string Description { get; set; }
public object Data { get; set; }
}
// 健康检查中间件
public class HealthCheckMiddleware
{
private readonly RequestDelegate _next;
private readonly HealthCheckService _healthCheckService;
private readonly ILogger<HealthCheckMiddleware> _logger;
public HealthCheckMiddleware(RequestDelegate next, HealthCheckService healthCheckService, ILogger<HealthCheckMiddleware> logger)
{
_next = next ?? throw new ArgumentNullException(nameof(next));
_healthCheckService = healthCheckService ?? throw new ArgumentNullException(nameof(healthCheckService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task InvokeAsync(HttpContext context)
{
if (context.Request.Path.StartsWithSegments("/health"))
{
var healthReport = await _healthCheckService.CheckHealthAsync();
var response = new HealthCheckResponse
{
Status = healthReport.Status.ToString(),
TotalDuration = healthReport.TotalDuration,
Entries = healthReport.Entries.ToDictionary(
kvp => kvp.Key,
kvp => new HealthCheckEntry
{
Status = kvp.Value.Status.ToString(),
Duration = kvp.Value.Duration,
Description = kvp.Value.Description,
Data = kvp.Value.Data
})
};
context.Response.ContentType = "application/json";
context.Response.StatusCode = healthReport.Status == HealthStatus.Healthy ? 200 : 503;
var json = JsonSerializer.Serialize(response, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = true
});
await context.Response.WriteAsync(json);
return;
}
await _next(context);
}
}