13.1 API网关架构概览
API网关是低代码平台中连接内部服务和外部系统的关键组件,它提供了统一的API入口、请求路由、认证授权、限流熔断、监控日志等功能。
13.1.1 API网关管理器接口
## 13.5 熔断器
### 13.5.1 熔断器接口和实现
```typescript
// 熔断器接口
interface CircuitBreaker {
isOpen(serviceId: string): boolean;
recordSuccess(serviceId: string): void;
recordFailure(serviceId: string): void;
getState(serviceId: string): CircuitBreakerState;
reset(serviceId: string): void;
}
// 熔断器状态
enum CircuitBreakerState {
CLOSED = 'closed',
OPEN = 'open',
HALF_OPEN = 'half_open'
}
// 熔断器统计
interface CircuitBreakerStats {
requests: number;
failures: number;
successes: number;
lastFailureTime?: Date;
lastSuccessTime?: Date;
}
// 默认熔断器实现
class DefaultCircuitBreaker implements CircuitBreaker {
private states: Map<string, CircuitBreakerState> = new Map();
private stats: Map<string, CircuitBreakerStats> = new Map();
private configs: Map<string, CircuitBreakerConfig> = new Map();
private openTimestamps: Map<string, number> = new Map();
constructor(private defaultConfig: CircuitBreakerConfig) {}
// 设置服务配置
setConfig(serviceId: string, config: CircuitBreakerConfig): void {
this.configs.set(serviceId, config);
}
// 检查熔断器是否打开
isOpen(serviceId: string): boolean {
const state = this.getState(serviceId);
if (state === CircuitBreakerState.OPEN) {
// 检查是否可以进入半开状态
const openTime = this.openTimestamps.get(serviceId) || 0;
const config = this.getConfig(serviceId);
if (Date.now() - openTime >= config.recoveryTimeout * 1000) {
this.states.set(serviceId, CircuitBreakerState.HALF_OPEN);
return false;
}
return true;
}
return false;
}
// 记录成功
recordSuccess(serviceId: string): void {
const stats = this.getStats(serviceId);
stats.requests++;
stats.successes++;
stats.lastSuccessTime = new Date();
const state = this.getState(serviceId);
if (state === CircuitBreakerState.HALF_OPEN) {
// 半开状态下成功,关闭熔断器
this.states.set(serviceId, CircuitBreakerState.CLOSED);
this.resetStats(serviceId);
}
}
// 记录失败
recordFailure(serviceId: string): void {
const stats = this.getStats(serviceId);
stats.requests++;
stats.failures++;
stats.lastFailureTime = new Date();
const config = this.getConfig(serviceId);
const state = this.getState(serviceId);
// 检查是否需要打开熔断器
if (state === CircuitBreakerState.CLOSED || state === CircuitBreakerState.HALF_OPEN) {
const failureRate = stats.failures / stats.requests;
if (stats.requests >= config.monitoringPeriod &&
failureRate >= config.failureThreshold) {
this.states.set(serviceId, CircuitBreakerState.OPEN);
this.openTimestamps.set(serviceId, Date.now());
}
}
}
// 获取状态
getState(serviceId: string): CircuitBreakerState {
return this.states.get(serviceId) || CircuitBreakerState.CLOSED;
}
// 重置熔断器
reset(serviceId: string): void {
this.states.set(serviceId, CircuitBreakerState.CLOSED);
this.resetStats(serviceId);
this.openTimestamps.delete(serviceId);
}
// 获取统计信息
private getStats(serviceId: string): CircuitBreakerStats {
if (!this.stats.has(serviceId)) {
this.stats.set(serviceId, {
requests: 0,
failures: 0,
successes: 0
});
}
return this.stats.get(serviceId)!;
}
// 重置统计信息
private resetStats(serviceId: string): void {
this.stats.set(serviceId, {
requests: 0,
failures: 0,
successes: 0
});
}
// 获取配置
private getConfig(serviceId: string): CircuitBreakerConfig {
return this.configs.get(serviceId) || this.defaultConfig;
}
}
13.6 限流器
13.6.1 限流器接口和实现
// 限流器接口
interface RateLimiter {
isAllowed(key: string, config?: RateLimitConfig): Promise<boolean>;
getRemainingRequests(key: string): Promise<number>;
reset(key: string): Promise<void>;
}
// 令牌桶限流器
class TokenBucketRateLimiter implements RateLimiter {
private buckets: Map<string, TokenBucket> = new Map();
private defaultConfig: RateLimitConfig;
constructor(defaultConfig: RateLimitConfig) {
this.defaultConfig = defaultConfig;
}
// 检查是否允许请求
async isAllowed(key: string, config?: RateLimitConfig): Promise<boolean> {
const bucket = this.getBucket(key, config || this.defaultConfig);
return bucket.consume();
}
// 获取剩余请求数
async getRemainingRequests(key: string): Promise<number> {
const bucket = this.buckets.get(key);
return bucket ? bucket.getTokens() : this.defaultConfig.requests;
}
// 重置限流器
async reset(key: string): Promise<void> {
this.buckets.delete(key);
}
// 获取令牌桶
private getBucket(key: string, config: RateLimitConfig): TokenBucket {
if (!this.buckets.has(key)) {
this.buckets.set(key, new TokenBucket(config));
}
return this.buckets.get(key)!;
}
}
// 令牌桶
class TokenBucket {
private tokens: number;
private lastRefill: number;
private config: RateLimitConfig;
constructor(config: RateLimitConfig) {
this.config = config;
this.tokens = config.requests;
this.lastRefill = Date.now();
}
// 消费令牌
consume(tokens: number = 1): boolean {
this.refill();
if (this.tokens >= tokens) {
this.tokens -= tokens;
return true;
}
return false;
}
// 获取当前令牌数
getTokens(): number {
this.refill();
return this.tokens;
}
// 补充令牌
private refill(): void {
const now = Date.now();
const timePassed = (now - this.lastRefill) / 1000; // 转换为秒
if (timePassed >= this.config.window) {
// 时间窗口已过,重置令牌
this.tokens = this.config.requests;
this.lastRefill = now;
} else {
// 按比例补充令牌
const tokensToAdd = (timePassed / this.config.window) * this.config.requests;
this.tokens = Math.min(this.config.requests, this.tokens + tokensToAdd);
this.lastRefill = now;
}
}
}
// 滑动窗口限流器
class SlidingWindowRateLimiter implements RateLimiter {
private windows: Map<string, SlidingWindow> = new Map();
private defaultConfig: RateLimitConfig;
constructor(defaultConfig: RateLimitConfig) {
this.defaultConfig = defaultConfig;
}
// 检查是否允许请求
async isAllowed(key: string, config?: RateLimitConfig): Promise<boolean> {
const window = this.getWindow(key, config || this.defaultConfig);
return window.isAllowed();
}
// 获取剩余请求数
async getRemainingRequests(key: string): Promise<number> {
const window = this.windows.get(key);
return window ? window.getRemainingRequests() : this.defaultConfig.requests;
}
// 重置限流器
async reset(key: string): Promise<void> {
this.windows.delete(key);
}
// 获取滑动窗口
private getWindow(key: string, config: RateLimitConfig): SlidingWindow {
if (!this.windows.has(key)) {
this.windows.set(key, new SlidingWindow(config));
}
return this.windows.get(key)!;
}
}
// 滑动窗口
class SlidingWindow {
private requests: number[] = [];
private config: RateLimitConfig;
constructor(config: RateLimitConfig) {
this.config = config;
}
// 检查是否允许请求
isAllowed(): boolean {
const now = Date.now();
const windowStart = now - (this.config.window * 1000);
// 清理过期请求
this.requests = this.requests.filter(timestamp => timestamp > windowStart);
// 检查是否超过限制
if (this.requests.length < this.config.requests) {
this.requests.push(now);
return true;
}
return false;
}
// 获取剩余请求数
getRemainingRequests(): number {
const now = Date.now();
const windowStart = now - (this.config.window * 1000);
// 清理过期请求
this.requests = this.requests.filter(timestamp => timestamp > windowStart);
return Math.max(0, this.config.requests - this.requests.length);
}
}
13.7 指标收集器
13.7.1 指标收集器接口和实现
// 指标收集器接口
interface MetricsCollector {
recordRequest(request: GatewayRequest): void;
recordResponse(response: GatewayResponse, latency: number): void;
recordError(error: Error): void;
getMetrics(query?: MetricsQuery): ApiMetrics;
reset(): void;
}
// 请求记录
interface RequestRecord {
timestamp: number;
path: string;
method: string;
serviceId?: string;
routeId?: string;
statusCode?: number;
latency?: number;
error?: string;
}
// 默认指标收集器
class DefaultMetricsCollector implements MetricsCollector {
private records: RequestRecord[] = [];
private maxRecords: number;
constructor(maxRecords: number = 10000) {
this.maxRecords = maxRecords;
}
// 记录请求
recordRequest(request: GatewayRequest): void {
const record: RequestRecord = {
timestamp: Date.now(),
path: request.path,
method: request.method,
serviceId: request.route?.serviceId,
routeId: request.route?.id
};
this.addRecord(record);
}
// 记录响应
recordResponse(response: GatewayResponse, latency: number): void {
// 更新最后一条记录
const lastRecord = this.records[this.records.length - 1];
if (lastRecord) {
lastRecord.statusCode = response.statusCode;
lastRecord.latency = latency;
}
}
// 记录错误
recordError(error: Error): void {
// 更新最后一条记录
const lastRecord = this.records[this.records.length - 1];
if (lastRecord) {
lastRecord.error = error.message;
lastRecord.statusCode = 500;
}
}
// 获取指标
getMetrics(query: MetricsQuery = {}): ApiMetrics {
const filteredRecords = this.filterRecords(query);
return {
requests: this.calculateRequestMetrics(filteredRecords),
latency: this.calculateLatencyMetrics(filteredRecords),
errors: this.calculateErrorMetrics(filteredRecords),
services: this.calculateServiceMetrics(filteredRecords),
routes: this.calculateRouteMetrics(filteredRecords)
};
}
// 重置指标
reset(): void {
this.records = [];
}
// 添加记录
private addRecord(record: RequestRecord): void {
this.records.push(record);
// 保持记录数量在限制内
if (this.records.length > this.maxRecords) {
this.records = this.records.slice(-this.maxRecords);
}
}
// 过滤记录
private filterRecords(query: MetricsQuery): RequestRecord[] {
let records = this.records;
if (query.startTime) {
records = records.filter(r => r.timestamp >= query.startTime!.getTime());
}
if (query.endTime) {
records = records.filter(r => r.timestamp <= query.endTime!.getTime());
}
if (query.serviceId) {
records = records.filter(r => r.serviceId === query.serviceId);
}
if (query.routeId) {
records = records.filter(r => r.routeId === query.routeId);
}
return records;
}
// 计算请求指标
private calculateRequestMetrics(records: RequestRecord[]): {
total: number;
successful: number;
failed: number;
rate: number;
} {
const total = records.length;
const successful = records.filter(r =>
r.statusCode && r.statusCode >= 200 && r.statusCode < 400
).length;
const failed = total - successful;
// 计算请求速率(请求/秒)
const timeSpan = this.getTimeSpan(records);
const rate = timeSpan > 0 ? total / (timeSpan / 1000) : 0;
return { total, successful, failed, rate };
}
// 计算延迟指标
private calculateLatencyMetrics(records: RequestRecord[]): {
p50: number;
p95: number;
p99: number;
average: number;
} {
const latencies = records
.filter(r => r.latency !== undefined)
.map(r => r.latency!)
.sort((a, b) => a - b);
if (latencies.length === 0) {
return { p50: 0, p95: 0, p99: 0, average: 0 };
}
const p50 = this.percentile(latencies, 0.5);
const p95 = this.percentile(latencies, 0.95);
const p99 = this.percentile(latencies, 0.99);
const average = latencies.reduce((sum, l) => sum + l, 0) / latencies.length;
return { p50, p95, p99, average };
}
// 计算错误指标
private calculateErrorMetrics(records: RequestRecord[]): {
total: number;
rate: number;
byStatusCode: Record<number, number>;
} {
const errorRecords = records.filter(r =>
r.error || (r.statusCode && r.statusCode >= 400)
);
const total = errorRecords.length;
const timeSpan = this.getTimeSpan(records);
const rate = timeSpan > 0 ? total / (timeSpan / 1000) : 0;
// 按状态码分组
const byStatusCode: Record<number, number> = {};
for (const record of errorRecords) {
if (record.statusCode) {
byStatusCode[record.statusCode] = (byStatusCode[record.statusCode] || 0) + 1;
}
}
return { total, rate, byStatusCode };
}
// 计算服务指标
private calculateServiceMetrics(records: RequestRecord[]): Record<string, ServiceMetrics> {
const serviceMetrics: Record<string, ServiceMetrics> = {};
// 按服务分组
const serviceGroups = this.groupBy(records, r => r.serviceId || 'unknown');
for (const [serviceId, serviceRecords] of serviceGroups) {
const requests = serviceRecords.length;
const errors = serviceRecords.filter(r =>
r.error || (r.statusCode && r.statusCode >= 400)
).length;
const latencies = serviceRecords
.filter(r => r.latency !== undefined)
.map(r => r.latency!);
const latency = latencies.length > 0 ?
latencies.reduce((sum, l) => sum + l, 0) / latencies.length : 0;
const availability = requests > 0 ? (requests - errors) / requests : 1;
serviceMetrics[serviceId] = {
requests,
errors,
latency,
availability
};
}
return serviceMetrics;
}
// 计算路由指标
private calculateRouteMetrics(records: RequestRecord[]): Record<string, RouteMetrics> {
const routeMetrics: Record<string, RouteMetrics> = {};
// 按路由分组
const routeGroups = this.groupBy(records, r => r.routeId || 'unknown');
for (const [routeId, routeRecords] of routeGroups) {
const requests = routeRecords.length;
const errors = routeRecords.filter(r =>
r.error || (r.statusCode && r.statusCode >= 400)
).length;
const latencies = routeRecords
.filter(r => r.latency !== undefined)
.map(r => r.latency!);
const latency = latencies.length > 0 ?
latencies.reduce((sum, l) => sum + l, 0) / latencies.length : 0;
routeMetrics[routeId] = {
requests,
errors,
latency
};
}
return routeMetrics;
}
// 计算百分位数
private percentile(values: number[], p: number): number {
if (values.length === 0) return 0;
const index = Math.ceil(values.length * p) - 1;
return values[Math.max(0, index)];
}
// 获取时间跨度
private getTimeSpan(records: RequestRecord[]): number {
if (records.length === 0) return 0;
const timestamps = records.map(r => r.timestamp);
return Math.max(...timestamps) - Math.min(...timestamps);
}
// 分组
private groupBy<T, K extends string | number>(
items: T[],
keyFn: (item: T) => K
): Map<K, T[]> {
const groups = new Map<K, T[]>();
for (const item of items) {
const key = keyFn(item);
if (!groups.has(key)) {
groups.set(key, []);
}
groups.get(key)!.push(item);
}
return groups;
}
}
13.8 服务发现
13.8.1 服务发现接口和实现
// 服务发现接口
interface ServiceDiscovery {
registerService(service: Service): Promise<void>;
unregisterService(serviceId: string): Promise<void>;
discoverServices(query?: ServiceDiscoveryQuery): Promise<Service[]>;
watchServices(callback: ServiceWatchCallback): ServiceWatcher;
getServiceHealth(serviceId: string): Promise<ServiceHealth>;
}
// 服务发现查询
interface ServiceDiscoveryQuery {
name?: string;
tags?: string[];
status?: ServiceStatus;
datacenter?: string;
}
// 服务监听回调
type ServiceWatchCallback = (event: ServiceEvent) => void;
// 服务事件
interface ServiceEvent {
type: 'registered' | 'unregistered' | 'health_changed';
service: Service;
}
// 服务监听器
interface ServiceWatcher {
stop(): void;
}
// 服务健康状态
interface ServiceHealth {
serviceId: string;
status: ServiceStatus;
checks: HealthCheck[];
lastUpdated: Date;
}
// 健康检查
interface HealthCheck {
id: string;
name: string;
status: 'passing' | 'warning' | 'critical';
output?: string;
lastChecked: Date;
}
// 内存服务发现实现
class MemoryServiceDiscovery implements ServiceDiscovery {
private services: Map<string, Service> = new Map();
private watchers: Set<ServiceWatchCallback> = new Set();
private healthChecks: Map<string, ServiceHealth> = new Map();
private healthCheckInterval: NodeJS.Timeout | null = null;
constructor() {
this.startHealthChecking();
}
// 注册服务
async registerService(service: Service): Promise<void> {
this.services.set(service.id, service);
// 初始化健康状态
this.healthChecks.set(service.id, {
serviceId: service.id,
status: ServiceStatus.ACTIVE,
checks: [],
lastUpdated: new Date()
});
// 通知监听器
this.notifyWatchers({
type: 'registered',
service
});
}
// 注销服务
async unregisterService(serviceId: string): Promise<void> {
const service = this.services.get(serviceId);
if (service) {
this.services.delete(serviceId);
this.healthChecks.delete(serviceId);
// 通知监听器
this.notifyWatchers({
type: 'unregistered',
service
});
}
}
// 发现服务
async discoverServices(query: ServiceDiscoveryQuery = {}): Promise<Service[]> {
let services = Array.from(this.services.values());
// 应用过滤器
if (query.name) {
services = services.filter(s => s.name.includes(query.name!));
}
if (query.status) {
services = services.filter(s => s.status === query.status);
}
if (query.tags && query.tags.length > 0) {
services = services.filter(s =>
s.tags && query.tags!.some(tag => s.tags!.includes(tag))
);
}
return services;
}
// 监听服务变化
watchServices(callback: ServiceWatchCallback): ServiceWatcher {
this.watchers.add(callback);
return {
stop: () => {
this.watchers.delete(callback);
}
};
}
// 获取服务健康状态
async getServiceHealth(serviceId: string): Promise<ServiceHealth> {
const health = this.healthChecks.get(serviceId);
if (!health) {
throw new Error(`服务健康状态不存在: ${serviceId}`);
}
return health;
}
// 通知监听器
private notifyWatchers(event: ServiceEvent): void {
for (const callback of this.watchers) {
try {
callback(event);
} catch (error) {
console.error('服务监听器回调错误:', error);
}
}
}
// 开始健康检查
private startHealthChecking(): void {
this.healthCheckInterval = setInterval(() => {
this.performHealthChecks();
}, 30000); // 每30秒检查一次
}
// 执行健康检查
private async performHealthChecks(): Promise<void> {
for (const [serviceId, service] of this.services) {
try {
const health = await this.checkServiceHealth(service);
const oldHealth = this.healthChecks.get(serviceId);
this.healthChecks.set(serviceId, health);
// 如果健康状态发生变化,通知监听器
if (oldHealth && oldHealth.status !== health.status) {
const updatedService = { ...service, status: health.status };
this.services.set(serviceId, updatedService);
this.notifyWatchers({
type: 'health_changed',
service: updatedService
});
}
} catch (error) {
console.error(`服务健康检查失败 [${serviceId}]:`, error);
}
}
}
// 检查服务健康状态
private async checkServiceHealth(service: Service): Promise<ServiceHealth> {
const checks: HealthCheck[] = [];
let overallStatus = ServiceStatus.ACTIVE;
// 检查每个实例
for (const instance of service.instances) {
try {
const check = await this.checkInstanceHealth(instance, service.healthCheck);
checks.push(check);
if (check.status === 'critical') {
overallStatus = ServiceStatus.UNHEALTHY;
} else if (check.status === 'warning' && overallStatus === ServiceStatus.ACTIVE) {
overallStatus = ServiceStatus.INACTIVE;
}
} catch (error) {
checks.push({
id: instance.id,
name: `Instance ${instance.id}`,
status: 'critical',
output: error.message,
lastChecked: new Date()
});
overallStatus = ServiceStatus.UNHEALTHY;
}
}
return {
serviceId: service.id,
status: overallStatus,
checks,
lastUpdated: new Date()
};
}
// 检查实例健康状态
private async checkInstanceHealth(
instance: ServiceInstance,
healthConfig?: HealthCheckConfig
): Promise<HealthCheck> {
if (!healthConfig) {
return {
id: instance.id,
name: `Instance ${instance.id}`,
status: 'passing',
lastChecked: new Date()
};
}
const healthUrl = new URL(healthConfig.path, instance.url);
try {
const response = await fetch(healthUrl.toString(), {
method: 'GET',
timeout: healthConfig.timeout * 1000
});
const status = response.ok ? 'passing' : 'critical';
const output = response.ok ? 'OK' : `HTTP ${response.status}`;
return {
id: instance.id,
name: `Instance ${instance.id}`,
status,
output,
lastChecked: new Date()
};
} catch (error) {
return {
id: instance.id,
name: `Instance ${instance.id}`,
status: 'critical',
output: error.message,
lastChecked: new Date()
};
}
}
// 停止健康检查
destroy(): void {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
}
}
// Consul服务发现实现
class ConsulServiceDiscovery implements ServiceDiscovery {
private consulClient: any; // Consul客户端
private watchers: Set<ServiceWatchCallback> = new Set();
constructor(consulConfig: any) {
// 初始化Consul客户端
// this.consulClient = new Consul(consulConfig);
}
// 注册服务
async registerService(service: Service): Promise<void> {
// 实现Consul服务注册
// await this.consulClient.agent.service.register({
// id: service.id,
// name: service.name,
// tags: service.tags,
// address: service.instances[0]?.url,
// port: this.extractPort(service.instances[0]?.url),
// check: service.healthCheck ? {
// http: service.healthCheck.path,
// interval: `${service.healthCheck.interval}s`
// } : undefined
// });
}
// 注销服务
async unregisterService(serviceId: string): Promise<void> {
// 实现Consul服务注销
// await this.consulClient.agent.service.deregister(serviceId);
}
// 发现服务
async discoverServices(query: ServiceDiscoveryQuery = {}): Promise<Service[]> {
// 实现Consul服务发现
// const services = await this.consulClient.health.service({
// service: query.name,
// tag: query.tags,
// passing: query.status === ServiceStatus.ACTIVE
// });
//
// return this.convertConsulServices(services);
return [];
}
// 监听服务变化
watchServices(callback: ServiceWatchCallback): ServiceWatcher {
this.watchers.add(callback);
// 实现Consul服务监听
// const watcher = this.consulClient.watch({
// method: this.consulClient.health.service,
// options: { service: 'all' }
// });
//
// watcher.on('change', (data: any) => {
// // 处理服务变化事件
// });
return {
stop: () => {
this.watchers.delete(callback);
// watcher.end();
}
};
}
// 获取服务健康状态
async getServiceHealth(serviceId: string): Promise<ServiceHealth> {
// 实现Consul健康状态查询
// const health = await this.consulClient.health.service({
// service: serviceId
// });
//
// return this.convertConsulHealth(health);
throw new Error('Not implemented');
}
}```
## 13.9 使用示例
### 13.9.1 API网关与服务集成示例
```typescript
// API网关与服务集成完整示例
class LowCodeApiGatewayDemo {
private gatewayManager: LowCodeApiGatewayManager;
private serviceDiscovery: MemoryServiceDiscovery;
private eventBus: EventBus;
constructor() {
this.eventBus = new LowCodeEventBus();
this.serviceDiscovery = new MemoryServiceDiscovery();
// 创建认证管理器
const authManager = new JWTAuthenticationManager({
secret: 'your-secret-key',
expiresIn: '1h'
});
// 创建API网关管理器
this.gatewayManager = new LowCodeApiGatewayManager({
authManager,
eventBus: this.eventBus
});
this.setupEventListeners();
}
// 演示基本API网关功能
async demonstrateBasicGateway(): Promise<void> {
console.log('=== API网关基本功能演示 ===');
// 1. 注册服务
const userService = await this.gatewayManager.registerService({
name: 'user-service',
description: '用户管理服务',
version: '1.0.0',
instances: [
{
id: 'user-service-1',
url: 'http://localhost:3001',
weight: 1,
status: InstanceStatus.HEALTHY
},
{
id: 'user-service-2',
url: 'http://localhost:3002',
weight: 1,
status: InstanceStatus.HEALTHY
}
],
healthCheck: {
path: '/health',
interval: 30,
timeout: 5,
healthyThreshold: 2,
unhealthyThreshold: 3
},
tags: ['user', 'auth']
});
console.log('用户服务已注册:', userService.id);
// 2. 创建路由
const userRoutes = [
{
path: '/api/users',
methods: [HttpMethod.GET, HttpMethod.POST],
serviceId: userService.id,
requireAuth: true,
rateLimit: {
requests: 100,
window: 60,
keyBy: 'user' as const
},
description: '用户管理API'
},
{
path: '/api/users/{id}',
methods: [HttpMethod.GET, HttpMethod.PUT, HttpMethod.DELETE],
serviceId: userService.id,
requireAuth: true,
rateLimit: {
requests: 50,
window: 60,
keyBy: 'user' as const
},
description: '单个用户操作API'
}
];
for (const routeDefinition of userRoutes) {
const route = await this.gatewayManager.createRoute(routeDefinition);
console.log('路由已创建:', route.path);
}
// 3. 注册产品服务
const productService = await this.gatewayManager.registerService({
name: 'product-service',
description: '产品管理服务',
version: '1.0.0',
instances: [
{
id: 'product-service-1',
url: 'http://localhost:3003',
weight: 2,
status: InstanceStatus.HEALTHY
}
],
healthCheck: {
path: '/health',
interval: 30,
timeout: 5,
healthyThreshold: 2,
unhealthyThreshold: 3
},
tags: ['product', 'catalog']
});
// 4. 创建产品路由
await this.gatewayManager.createRoute({
path: '/api/products',
methods: [HttpMethod.GET],
serviceId: productService.id,
requireAuth: false,
rateLimit: {
requests: 200,
window: 60,
keyBy: 'ip' as const
},
description: '产品列表API(公开)'
});
console.log('产品服务和路由已配置');
}
// 演示请求处理
async demonstrateRequestHandling(): Promise<void> {
console.log('\n=== 请求处理演示 ===');
// 模拟请求
const requests: GatewayRequest[] = [
{
path: '/api/users',
method: 'GET',
headers: {
'authorization': 'Bearer valid-token',
'user-agent': 'Test Client'
},
clientIp: '192.168.1.100'
},
{
path: '/api/products',
method: 'GET',
headers: {
'user-agent': 'Test Client'
},
clientIp: '192.168.1.101'
},
{
path: '/api/users/123',
method: 'PUT',
headers: {
'authorization': 'Bearer valid-token',
'content-type': 'application/json'
},
body: JSON.stringify({ name: 'Updated User' }),
clientIp: '192.168.1.100'
}
];
// 处理请求
for (const request of requests) {
try {
console.log(`\n处理请求: ${request.method} ${request.path}`);
const response = await this.gatewayManager.handleRequest(request);
console.log(`响应状态: ${response.statusCode}`);
if (response.statusCode >= 400) {
console.log('错误响应:', response.body);
}
} catch (error) {
console.error('请求处理失败:', error.message);
}
}
}
// 演示负载均衡
async demonstrateLoadBalancing(): Promise<void> {
console.log('\n=== 负载均衡演示 ===');
// 获取用户服务
const services = await this.gatewayManager.getServices({ name: 'user-service' });
const userService = services[0];
if (userService) {
console.log('用户服务实例:');
userService.instances.forEach(instance => {
console.log(`- ${instance.id}: ${instance.url} (权重: ${instance.weight})`);
});
// 模拟多次请求以观察负载均衡
console.log('\n模拟10次请求的负载均衡:');
for (let i = 0; i < 10; i++) {
const request: GatewayRequest = {
path: '/api/users',
method: 'GET',
headers: {
'authorization': 'Bearer valid-token'
},
clientIp: `192.168.1.${100 + i}`
};
// 这里只是演示,实际会通过负载均衡器选择实例
console.log(`请求 ${i + 1}: 路由到实例 user-service-${(i % 2) + 1}`);
}
}
}
// 演示熔断器
async demonstrateCircuitBreaker(): Promise<void> {
console.log('\n=== 熔断器演示 ===');
const services = await this.gatewayManager.getServices();
const serviceId = services[0]?.id;
if (serviceId) {
// 模拟服务失败
console.log('模拟服务失败...');
for (let i = 0; i < 5; i++) {
// 这里模拟失败记录
console.log(`失败 ${i + 1}: 记录到熔断器`);
}
console.log('熔断器状态: 可能已打开(基于失败阈值)');
// 模拟恢复
setTimeout(() => {
console.log('服务恢复,熔断器进入半开状态');
}, 2000);
}
}
// 演示限流
async demonstrateRateLimiting(): Promise<void> {
console.log('\n=== 限流演示 ===');
const rateLimiter = new TokenBucketRateLimiter({
requests: 5,
window: 10,
keyBy: 'ip'
});
const clientIp = '192.168.1.100';
console.log('模拟客户端快速请求:');
for (let i = 0; i < 8; i++) {
const allowed = await rateLimiter.isAllowed(clientIp);
const remaining = await rateLimiter.getRemainingRequests(clientIp);
console.log(`请求 ${i + 1}: ${allowed ? '允许' : '拒绝'}, 剩余: ${remaining}`);
if (!allowed) {
console.log('触发限流,请求被拒绝');
}
}
}
// 演示指标收集
async demonstrateMetrics(): Promise<void> {
console.log('\n=== 指标收集演示 ===');
// 获取API指标
const metrics = await this.gatewayManager.getMetrics();
console.log('API网关指标:');
console.log(`- 总请求数: ${metrics.requests.total}`);
console.log(`- 成功请求: ${metrics.requests.successful}`);
console.log(`- 失败请求: ${metrics.requests.failed}`);
console.log(`- 请求速率: ${metrics.requests.rate.toFixed(2)} req/s`);
console.log(`- 平均延迟: ${metrics.latency.average.toFixed(2)}ms`);
console.log(`- P95延迟: ${metrics.latency.p95.toFixed(2)}ms`);
console.log(`- 错误率: ${metrics.errors.rate.toFixed(2)} err/s`);
// 服务指标
console.log('\n服务指标:');
Object.entries(metrics.services).forEach(([serviceId, serviceMetrics]) => {
console.log(`- ${serviceId}:`);
console.log(` 请求数: ${serviceMetrics.requests}`);
console.log(` 错误数: ${serviceMetrics.errors}`);
console.log(` 可用性: ${(serviceMetrics.availability * 100).toFixed(2)}%`);
});
}
// 演示服务发现
async demonstrateServiceDiscovery(): Promise<void> {
console.log('\n=== 服务发现演示 ===');
// 监听服务变化
const watcher = this.serviceDiscovery.watchServices((event) => {
console.log(`服务事件: ${event.type} - ${event.service.name}`);
});
// 注册新服务
await this.serviceDiscovery.registerService({
id: 'notification-service',
name: 'notification-service',
description: '通知服务',
version: '1.0.0',
status: ServiceStatus.ACTIVE,
instances: [
{
id: 'notification-service-1',
url: 'http://localhost:3004',
weight: 1,
status: InstanceStatus.HEALTHY
}
],
tags: ['notification', 'messaging'],
registeredAt: new Date(),
lastHealthCheck: new Date()
});
// 发现服务
const services = await this.serviceDiscovery.discoverServices({
tags: ['notification']
});
console.log('发现的通知服务:', services.map(s => s.name));
// 停止监听
setTimeout(() => {
watcher.stop();
console.log('停止服务监听');
}, 1000);
}
// 演示健康检查
async demonstrateHealthCheck(): Promise<void> {
console.log('\n=== 健康检查演示 ===');
// 获取网关健康状态
const healthStatus = await this.gatewayManager.getHealthStatus();
console.log('API网关健康状态:');
console.log(`- 整体状态: ${healthStatus.status}`);
console.log(`- 服务总数: ${healthStatus.services.total}`);
console.log(`- 健康服务: ${healthStatus.services.healthy}`);
console.log(`- 不健康服务: ${healthStatus.services.unhealthy}`);
console.log(`- 路由总数: ${healthStatus.routes.total}`);
console.log(`- 活跃路由: ${healthStatus.routes.active}`);
// 检查特定服务健康状态
const services = await this.gatewayManager.getServices();
if (services.length > 0) {
try {
const serviceHealth = await this.serviceDiscovery.getServiceHealth(services[0].id);
console.log(`\n服务 ${services[0].name} 健康状态:`);
console.log(`- 状态: ${serviceHealth.status}`);
console.log(`- 检查项数量: ${serviceHealth.checks.length}`);
serviceHealth.checks.forEach(check => {
console.log(` - ${check.name}: ${check.status}`);
});
} catch (error) {
console.log('获取服务健康状态失败:', error.message);
}
}
}
// 设置事件监听器
private setupEventListeners(): void {
this.eventBus.on('route.created', (event) => {
console.log(`路由已创建: ${event.route.path}`);
});
this.eventBus.on('service.registered', (event) => {
console.log(`服务已注册: ${event.service.name}`);
});
this.eventBus.on('request.processed', (event) => {
console.log(`请求已处理: ${event.request.method} ${event.request.path} -> ${event.response.statusCode}`);
});
}
// 运行完整演示
async runDemo(): Promise<void> {
try {
await this.demonstrateBasicGateway();
await this.demonstrateRequestHandling();
await this.demonstrateLoadBalancing();
await this.demonstrateCircuitBreaker();
await this.demonstrateRateLimiting();
await this.demonstrateMetrics();
await this.demonstrateServiceDiscovery();
await this.demonstrateHealthCheck();
console.log('\n=== API网关演示完成 ===');
} catch (error) {
console.error('演示过程中发生错误:', error);
}
}
}
// 运行演示
const demo = new LowCodeApiGatewayDemo();
demo.runDemo();
13.9.2 Web API集成示例
// Express.js集成示例
import express from 'express';
import { LowCodeApiGatewayManager } from './api-gateway';
const app = express();
const gatewayManager = new LowCodeApiGatewayManager({
authManager: new JWTAuthenticationManager({ secret: 'secret' }),
eventBus: new LowCodeEventBus()
});
// 中间件
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
// CORS中间件
app.use((req, res, next) => {
res.header('Access-Control-Allow-Origin', '*');
res.header('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS');
res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization');
if (req.method === 'OPTIONS') {
res.sendStatus(200);
} else {
next();
}
});
// 服务管理API
app.post('/gateway/services', async (req, res) => {
try {
const service = await gatewayManager.registerService(req.body);
res.json({ success: true, data: service });
} catch (error) {
res.status(400).json({ success: false, error: error.message });
}
});
app.get('/gateway/services', async (req, res) => {
try {
const services = await gatewayManager.getServices(req.query);
res.json({ success: true, data: services });
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
app.get('/gateway/services/:id', async (req, res) => {
try {
const service = await gatewayManager.getService(req.params.id);
if (service) {
res.json({ success: true, data: service });
} else {
res.status(404).json({ success: false, error: '服务不存在' });
}
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
app.put('/gateway/services/:id', async (req, res) => {
try {
const service = await gatewayManager.updateService(req.params.id, req.body);
res.json({ success: true, data: service });
} catch (error) {
res.status(400).json({ success: false, error: error.message });
}
});
app.delete('/gateway/services/:id', async (req, res) => {
try {
await gatewayManager.unregisterService(req.params.id);
res.json({ success: true });
} catch (error) {
res.status(400).json({ success: false, error: error.message });
}
});
// 路由管理API
app.post('/gateway/routes', async (req, res) => {
try {
const route = await gatewayManager.createRoute(req.body);
res.json({ success: true, data: route });
} catch (error) {
res.status(400).json({ success: false, error: error.message });
}
});
app.get('/gateway/routes', async (req, res) => {
try {
const routes = await gatewayManager.getRoutes(req.query);
res.json({ success: true, data: routes });
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
app.get('/gateway/routes/:id', async (req, res) => {
try {
const route = await gatewayManager.getRoute(req.params.id);
if (route) {
res.json({ success: true, data: route });
} else {
res.status(404).json({ success: false, error: '路由不存在' });
}
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
app.put('/gateway/routes/:id', async (req, res) => {
try {
const route = await gatewayManager.updateRoute(req.params.id, req.body);
res.json({ success: true, data: route });
} catch (error) {
res.status(400).json({ success: false, error: error.message });
}
});
app.delete('/gateway/routes/:id', async (req, res) => {
try {
await gatewayManager.deleteRoute(req.params.id);
res.json({ success: true });
} catch (error) {
res.status(400).json({ success: false, error: error.message });
}
});
// 指标API
app.get('/gateway/metrics', async (req, res) => {
try {
const metrics = await gatewayManager.getMetrics(req.query);
res.json({ success: true, data: metrics });
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
// 健康检查API
app.get('/gateway/health', async (req, res) => {
try {
const health = await gatewayManager.getHealthStatus();
res.json({ success: true, data: health });
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
// 代理所有其他请求到网关
app.all('*', async (req, res) => {
try {
const gatewayRequest: GatewayRequest = {
path: req.path,
method: req.method,
headers: req.headers as Record<string, string>,
queryParams: req.query as Record<string, string>,
body: req.body ? JSON.stringify(req.body) : undefined,
clientIp: req.ip || req.connection.remoteAddress || 'unknown'
};
const gatewayResponse = await gatewayManager.handleRequest(gatewayRequest);
// 设置响应头
Object.entries(gatewayResponse.headers).forEach(([key, value]) => {
res.setHeader(key, value);
});
// 发送响应
res.status(gatewayResponse.statusCode).send(gatewayResponse.body);
} catch (error) {
res.status(500).json({
error: {
code: 500,
message: 'Internal Gateway Error',
details: error.message
}
});
}
});
// 启动服务器
const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
console.log(`API网关服务器运行在端口 ${PORT}`);
});
13.10 本章小结
在本章中,我们深入学习了低代码平台中的API网关与服务集成系统。通过完整的架构设计和代码实现,我们掌握了以下核心要点:
核心功能
API网关架构:设计了完整的API网关管理器,支持路由管理、服务注册、中间件处理和请求转发
请求路由:实现了高效的路由匹配算法,支持路径参数、HTTP方法匹配和动态路由更新
负载均衡:提供了多种负载均衡策略,包括轮询、加权轮询和最少连接算法
熔断器:实现了服务熔断机制,支持故障检测、状态管理和自动恢复
限流控制:提供了令牌桶和滑动窗口两种限流算法,支持多维度限流策略
指标收集:实现了全面的性能监控,包括请求统计、延迟分析和错误追踪
服务发现:支持服务注册、发现、健康检查和动态更新
中间件系统:提供了可扩展的中间件架构,支持认证、授权、转换等功能
技术特色
高性能:采用高效的数据结构和算法,支持高并发请求处理
可扩展性:模块化设计,支持自定义中间件、负载均衡策略和服务发现机制
容错性:内置熔断器、重试机制和降级策略,提高系统稳定性
监控能力:全面的指标收集和健康检查,支持实时监控和告警
类型安全:完整的TypeScript类型定义,提供编译时类型检查
事件驱动:基于事件总线的松耦合架构,支持组件间异步通信
配置灵活:支持动态配置更新,无需重启服务即可调整路由和策略
安全性:内置认证授权、限流防护和请求验证机制
应用场景
- 微服务架构:作为微服务间的统一入口和流量管理中心
- API管理:提供API版本控制、访问控制和使用统计
- 负载均衡:实现服务实例间的流量分发和故障转移
- 安全网关:集中处理认证、授权和安全策略
- 监控中心:收集和分析API使用情况和性能指标
通过本章的学习,你已经掌握了构建企业级API网关的核心技术。在下一章中,我们将学习低代码平台的部署与运维,了解如何将低代码平台部署到生产环境并进行有效的运维管理。
// API网关管理器接口
interface ApiGatewayManager {
// 路由管理
createRoute(route: RouteDefinition): Promise<Route>;
updateRoute(id: string, route: RouteDefinition): Promise<Route>;
deleteRoute(id: string): Promise<void>;
getRoute(id: string): Promise<Route | null>;
getRoutes(query?: RouteQuery): Promise<Route[]>;
// 服务管理
registerService(service: ServiceDefinition): Promise<Service>;
updateService(id: string, service: ServiceDefinition): Promise<Service>;
unregisterService(id: string): Promise<void>;
getService(id: string): Promise<Service | null>;
getServices(query?: ServiceQuery): Promise<Service[]>;
// 中间件管理
addMiddleware(middleware: MiddlewareDefinition): Promise<Middleware>;
removeMiddleware(id: string): Promise<void>;
getMiddlewares(): Promise<Middleware[]>;
// 请求处理
handleRequest(request: GatewayRequest): Promise<GatewayResponse>;
// 监控和统计
getMetrics(query?: MetricsQuery): Promise<ApiMetrics>;
getHealthStatus(): Promise<HealthStatus>;
}
// API网关管理器实现
class LowCodeApiGatewayManager implements ApiGatewayManager {
private routes: Map<string, Route> = new Map();
private services: Map<string, Service> = new Map();
private middlewares: Map<string, Middleware> = new Map();
private router: RequestRouter;
private loadBalancer: LoadBalancer;
private circuitBreaker: CircuitBreaker;
private rateLimiter: RateLimiter;
private authManager: AuthenticationManager;
private metricsCollector: MetricsCollector;
private eventBus: EventBus;
constructor(options: ApiGatewayOptions) {
this.router = new RequestRouter();
this.loadBalancer = new RoundRobinLoadBalancer();
this.circuitBreaker = new DefaultCircuitBreaker();
this.rateLimiter = new TokenBucketRateLimiter();
this.authManager = options.authManager;
this.metricsCollector = new DefaultMetricsCollector();
this.eventBus = options.eventBus;
this.initializeDefaultMiddlewares();
}
// 创建路由
async createRoute(routeDefinition: RouteDefinition): Promise<Route> {
const route: Route = {
id: this.generateId(),
...routeDefinition,
status: RouteStatus.ACTIVE,
createdAt: new Date(),
updatedAt: new Date()
};
// 验证路由配置
this.validateRoute(route);
// 注册路由
this.routes.set(route.id, route);
this.router.addRoute(route);
// 发送事件
this.eventBus.emit('route.created', { route });
return route;
}
// 更新路由
async updateRoute(id: string, routeDefinition: RouteDefinition): Promise<Route> {
const existingRoute = this.routes.get(id);
if (!existingRoute) {
throw new Error(`路由不存在: ${id}`);
}
const updatedRoute: Route = {
...existingRoute,
...routeDefinition,
id,
updatedAt: new Date()
};
// 验证路由配置
this.validateRoute(updatedRoute);
// 更新路由
this.routes.set(id, updatedRoute);
this.router.updateRoute(updatedRoute);
// 发送事件
this.eventBus.emit('route.updated', { route: updatedRoute });
return updatedRoute;
}
// 删除路由
async deleteRoute(id: string): Promise<void> {
const route = this.routes.get(id);
if (!route) {
throw new Error(`路由不存在: ${id}`);
}
// 删除路由
this.routes.delete(id);
this.router.removeRoute(id);
// 发送事件
this.eventBus.emit('route.deleted', { routeId: id });
}
// 获取路由
async getRoute(id: string): Promise<Route | null> {
return this.routes.get(id) || null;
}
// 获取路由列表
async getRoutes(query: RouteQuery = {}): Promise<Route[]> {
let routes = Array.from(this.routes.values());
// 应用过滤器
if (query.path) {
routes = routes.filter(r => r.path.includes(query.path!));
}
if (query.method) {
routes = routes.filter(r => r.methods.includes(query.method!));
}
if (query.serviceId) {
routes = routes.filter(r => r.serviceId === query.serviceId);
}
if (query.status) {
routes = routes.filter(r => r.status === query.status);
}
// 应用排序
if (query.sortBy) {
routes.sort((a, b) => {
const aValue = (a as any)[query.sortBy!];
const bValue = (b as any)[query.sortBy!];
if (query.sortOrder === 'desc') {
return bValue > aValue ? 1 : -1;
} else {
return aValue > bValue ? 1 : -1;
}
});
}
return routes;
}
// 注册服务
async registerService(serviceDefinition: ServiceDefinition): Promise<Service> {
const service: Service = {
id: this.generateId(),
...serviceDefinition,
status: ServiceStatus.ACTIVE,
registeredAt: new Date(),
lastHealthCheck: new Date()
};
// 验证服务配置
this.validateService(service);
// 注册服务
this.services.set(service.id, service);
// 添加到负载均衡器
this.loadBalancer.addService(service);
// 发送事件
this.eventBus.emit('service.registered', { service });
return service;
}
// 更新服务
async updateService(id: string, serviceDefinition: ServiceDefinition): Promise<Service> {
const existingService = this.services.get(id);
if (!existingService) {
throw new Error(`服务不存在: ${id}`);
}
const updatedService: Service = {
...existingService,
...serviceDefinition,
id
};
// 验证服务配置
this.validateService(updatedService);
// 更新服务
this.services.set(id, updatedService);
this.loadBalancer.updateService(updatedService);
// 发送事件
this.eventBus.emit('service.updated', { service: updatedService });
return updatedService;
}
// 注销服务
async unregisterService(id: string): Promise<void> {
const service = this.services.get(id);
if (!service) {
throw new Error(`服务不存在: ${id}`);
}
// 注销服务
this.services.delete(id);
this.loadBalancer.removeService(id);
// 发送事件
this.eventBus.emit('service.unregistered', { serviceId: id });
}
// 获取服务
async getService(id: string): Promise<Service | null> {
return this.services.get(id) || null;
}
// 获取服务列表
async getServices(query: ServiceQuery = {}): Promise<Service[]> {
let services = Array.from(this.services.values());
// 应用过滤器
if (query.name) {
services = services.filter(s => s.name.includes(query.name!));
}
if (query.status) {
services = services.filter(s => s.status === query.status);
}
if (query.tags && query.tags.length > 0) {
services = services.filter(s =>
s.tags && query.tags!.some(tag => s.tags!.includes(tag))
);
}
return services;
}
// 添加中间件
async addMiddleware(middlewareDefinition: MiddlewareDefinition): Promise<Middleware> {
const middleware: Middleware = {
id: this.generateId(),
...middlewareDefinition,
createdAt: new Date()
};
this.middlewares.set(middleware.id, middleware);
// 发送事件
this.eventBus.emit('middleware.added', { middleware });
return middleware;
}
// 移除中间件
async removeMiddleware(id: string): Promise<void> {
this.middlewares.delete(id);
// 发送事件
this.eventBus.emit('middleware.removed', { middlewareId: id });
}
// 获取中间件列表
async getMiddlewares(): Promise<Middleware[]> {
return Array.from(this.middlewares.values())
.sort((a, b) => a.order - b.order);
}
// 处理请求
async handleRequest(request: GatewayRequest): Promise<GatewayResponse> {
const startTime = Date.now();
let response: GatewayResponse;
try {
// 记录请求指标
this.metricsCollector.recordRequest(request);
// 查找匹配的路由
const route = this.router.findRoute(request.path, request.method);
if (!route) {
return this.createErrorResponse(404, 'Route not found');
}
// 检查路由状态
if (route.status !== RouteStatus.ACTIVE) {
return this.createErrorResponse(503, 'Route is not active');
}
// 获取目标服务
const service = this.services.get(route.serviceId);
if (!service) {
return this.createErrorResponse(502, 'Service not found');
}
// 检查服务状态
if (service.status !== ServiceStatus.ACTIVE) {
return this.createErrorResponse(503, 'Service is not active');
}
// 执行中间件链
const context: MiddlewareContext = {
request,
route,
service,
variables: new Map()
};
const middlewareChain = this.buildMiddlewareChain(route);
const processedRequest = await this.executeMiddlewares(middlewareChain, context);
// 检查熔断器状态
if (this.circuitBreaker.isOpen(service.id)) {
return this.createErrorResponse(503, 'Circuit breaker is open');
}
// 选择服务实例
const instance = this.loadBalancer.selectInstance(service);
if (!instance) {
return this.createErrorResponse(502, 'No available service instance');
}
// 转发请求
response = await this.forwardRequest(processedRequest, instance, route);
// 记录成功指标
this.circuitBreaker.recordSuccess(service.id);
this.metricsCollector.recordResponse(response, Date.now() - startTime);
} catch (error) {
// 记录失败指标
if (request.route?.serviceId) {
this.circuitBreaker.recordFailure(request.route.serviceId);
}
this.metricsCollector.recordError(error);
response = this.createErrorResponse(500, 'Internal server error');
}
return response;
}
// 获取指标
async getMetrics(query: MetricsQuery = {}): Promise<ApiMetrics> {
return this.metricsCollector.getMetrics(query);
}
// 获取健康状态
async getHealthStatus(): Promise<HealthStatus> {
const services = Array.from(this.services.values());
const healthyServices = services.filter(s => s.status === ServiceStatus.ACTIVE).length;
const totalServices = services.length;
const routes = Array.from(this.routes.values());
const activeRoutes = routes.filter(r => r.status === RouteStatus.ACTIVE).length;
const totalRoutes = routes.length;
return {
status: healthyServices === totalServices ? 'healthy' : 'degraded',
services: {
total: totalServices,
healthy: healthyServices,
unhealthy: totalServices - healthyServices
},
routes: {
total: totalRoutes,
active: activeRoutes,
inactive: totalRoutes - activeRoutes
},
timestamp: new Date()
};
}
// 初始化默认中间件
private initializeDefaultMiddlewares(): void {
// 认证中间件
this.addMiddleware({
name: 'authentication',
type: MiddlewareType.AUTHENTICATION,
order: 100,
enabled: true,
config: {},
handler: async (context: MiddlewareContext) => {
if (context.route.requireAuth) {
const token = context.request.headers['authorization'];
if (!token) {
throw new Error('Authentication required');
}
const user = await this.authManager.validateToken(token);
context.variables.set('user', user);
}
return context.request;
}
});
// 限流中间件
this.addMiddleware({
name: 'rate-limiting',
type: MiddlewareType.RATE_LIMITING,
order: 200,
enabled: true,
config: {},
handler: async (context: MiddlewareContext) => {
const key = this.getRateLimitKey(context.request, context.route);
const allowed = await this.rateLimiter.isAllowed(key, context.route.rateLimit);
if (!allowed) {
throw new Error('Rate limit exceeded');
}
return context.request;
}
});
// 请求转换中间件
this.addMiddleware({
name: 'request-transformation',
type: MiddlewareType.TRANSFORMATION,
order: 300,
enabled: true,
config: {},
handler: async (context: MiddlewareContext) => {
if (context.route.requestTransform) {
return this.transformRequest(context.request, context.route.requestTransform);
}
return context.request;
}
});
}
// 验证路由配置
private validateRoute(route: Route): void {
if (!route.path) {
throw new Error('路由路径不能为空');
}
if (!route.methods || route.methods.length === 0) {
throw new Error('路由方法不能为空');
}
if (!route.serviceId) {
throw new Error('服务ID不能为空');
}
if (!this.services.has(route.serviceId)) {
throw new Error(`服务不存在: ${route.serviceId}`);
}
}
// 验证服务配置
private validateService(service: Service): void {
if (!service.name) {
throw new Error('服务名称不能为空');
}
if (!service.instances || service.instances.length === 0) {
throw new Error('服务实例不能为空');
}
for (const instance of service.instances) {
if (!instance.url) {
throw new Error('服务实例URL不能为空');
}
}
}
// 构建中间件链
private buildMiddlewareChain(route: Route): Middleware[] {
const middlewares = Array.from(this.middlewares.values())
.filter(m => m.enabled)
.sort((a, b) => a.order - b.order);
// 添加路由特定的中间件
if (route.middlewares) {
middlewares.push(...route.middlewares);
}
return middlewares;
}
// 执行中间件
private async executeMiddlewares(
middlewares: Middleware[],
context: MiddlewareContext
): Promise<GatewayRequest> {
let request = context.request;
for (const middleware of middlewares) {
try {
request = await middleware.handler({
...context,
request
});
} catch (error) {
throw new Error(`中间件执行失败 [${middleware.name}]: ${error.message}`);
}
}
return request;
}
// 转发请求
private async forwardRequest(
request: GatewayRequest,
instance: ServiceInstance,
route: Route
): Promise<GatewayResponse> {
const targetUrl = this.buildTargetUrl(instance, route, request);
const response = await fetch(targetUrl, {
method: request.method,
headers: request.headers,
body: request.body
});
const responseBody = await response.text();
return {
statusCode: response.status,
headers: Object.fromEntries(response.headers.entries()),
body: responseBody
};
}
// 构建目标URL
private buildTargetUrl(
instance: ServiceInstance,
route: Route,
request: GatewayRequest
): string {
let targetPath = route.targetPath || request.path;
// 替换路径参数
if (route.pathParams) {
for (const [key, value] of Object.entries(route.pathParams)) {
targetPath = targetPath.replace(`{${key}}`, value);
}
}
const url = new URL(targetPath, instance.url);
// 添加查询参数
if (request.queryParams) {
for (const [key, value] of Object.entries(request.queryParams)) {
url.searchParams.set(key, value);
}
}
return url.toString();
}
// 获取限流键
private getRateLimitKey(request: GatewayRequest, route: Route): string {
if (route.rateLimit?.keyBy === 'ip') {
return `ip:${request.clientIp}`;
} else if (route.rateLimit?.keyBy === 'user') {
return `user:${request.headers['user-id'] || 'anonymous'}`;
} else {
return `route:${route.id}`;
}
}
// 转换请求
private transformRequest(
request: GatewayRequest,
transform: RequestTransform
): GatewayRequest {
const transformedRequest = { ...request };
// 转换头部
if (transform.headers) {
transformedRequest.headers = {
...transformedRequest.headers,
...transform.headers
};
}
// 转换查询参数
if (transform.queryParams) {
transformedRequest.queryParams = {
...transformedRequest.queryParams,
...transform.queryParams
};
}
// 转换请求体
if (transform.body && transformedRequest.body) {
try {
const bodyObj = JSON.parse(transformedRequest.body);
const transformedBody = { ...bodyObj, ...transform.body };
transformedRequest.body = JSON.stringify(transformedBody);
} catch (error) {
// 忽略非JSON请求体
}
}
return transformedRequest;
}
// 创建错误响应
private createErrorResponse(statusCode: number, message: string): GatewayResponse {
return {
statusCode,
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
error: {
code: statusCode,
message
}
})
};
}
private generateId(): string {
return 'gw_' + Math.random().toString(36).substr(2, 9);
}
}
// API网关选项
interface ApiGatewayOptions {
authManager: AuthenticationManager;
eventBus: EventBus;
}
13.2 核心数据结构
13.2.1 路由和服务定义
// 路由定义
interface RouteDefinition {
path: string;
methods: HttpMethod[];
serviceId: string;
targetPath?: string;
requireAuth?: boolean;
rateLimit?: RateLimitConfig;
timeout?: number;
retries?: number;
middlewares?: Middleware[];
requestTransform?: RequestTransform;
responseTransform?: ResponseTransform;
pathParams?: Record<string, string>;
description?: string;
tags?: string[];
}
// 路由
interface Route extends RouteDefinition {
id: string;
status: RouteStatus;
createdAt: Date;
updatedAt: Date;
}
// 路由状态
enum RouteStatus {
ACTIVE = 'active',
INACTIVE = 'inactive',
MAINTENANCE = 'maintenance'
}
// HTTP方法
enum HttpMethod {
GET = 'GET',
POST = 'POST',
PUT = 'PUT',
DELETE = 'DELETE',
PATCH = 'PATCH',
HEAD = 'HEAD',
OPTIONS = 'OPTIONS'
}
// 服务定义
interface ServiceDefinition {
name: string;
description?: string;
version?: string;
instances: ServiceInstance[];
healthCheck?: HealthCheckConfig;
loadBalancing?: LoadBalancingConfig;
circuitBreaker?: CircuitBreakerConfig;
tags?: string[];
}
// 服务
interface Service extends ServiceDefinition {
id: string;
status: ServiceStatus;
registeredAt: Date;
lastHealthCheck: Date;
}
// 服务状态
enum ServiceStatus {
ACTIVE = 'active',
INACTIVE = 'inactive',
UNHEALTHY = 'unhealthy'
}
// 服务实例
interface ServiceInstance {
id: string;
url: string;
weight?: number;
status: InstanceStatus;
metadata?: Record<string, any>;
}
// 实例状态
enum InstanceStatus {
HEALTHY = 'healthy',
UNHEALTHY = 'unhealthy',
DRAINING = 'draining'
}
// 中间件定义
interface MiddlewareDefinition {
name: string;
type: MiddlewareType;
order: number;
enabled: boolean;
config: Record<string, any>;
handler: MiddlewareHandler;
}
// 中间件
interface Middleware extends MiddlewareDefinition {
id: string;
createdAt: Date;
}
// 中间件类型
enum MiddlewareType {
AUTHENTICATION = 'authentication',
AUTHORIZATION = 'authorization',
RATE_LIMITING = 'rate_limiting',
TRANSFORMATION = 'transformation',
LOGGING = 'logging',
MONITORING = 'monitoring',
CORS = 'cors',
COMPRESSION = 'compression'
}
// 中间件处理器
type MiddlewareHandler = (context: MiddlewareContext) => Promise<GatewayRequest>;
// 中间件上下文
interface MiddlewareContext {
request: GatewayRequest;
route: Route;
service: Service;
variables: Map<string, any>;
}
// 网关请求
interface GatewayRequest {
path: string;
method: string;
headers: Record<string, string>;
queryParams?: Record<string, string>;
pathParams?: Record<string, string>;
body?: string;
clientIp: string;
userAgent?: string;
route?: Route;
}
// 网关响应
interface GatewayResponse {
statusCode: number;
headers: Record<string, string>;
body: string;
}
// 限流配置
interface RateLimitConfig {
requests: number;
window: number; // 时间窗口(秒)
keyBy: 'ip' | 'user' | 'route';
burst?: number;
}
// 健康检查配置
interface HealthCheckConfig {
path: string;
interval: number; // 检查间隔(秒)
timeout: number; // 超时时间(秒)
healthyThreshold: number;
unhealthyThreshold: number;
}
// 负载均衡配置
interface LoadBalancingConfig {
strategy: LoadBalancingStrategy;
stickySession?: boolean;
sessionKey?: string;
}
// 负载均衡策略
enum LoadBalancingStrategy {
ROUND_ROBIN = 'round_robin',
WEIGHTED_ROUND_ROBIN = 'weighted_round_robin',
LEAST_CONNECTIONS = 'least_connections',
RANDOM = 'random',
IP_HASH = 'ip_hash'
}
// 熔断器配置
interface CircuitBreakerConfig {
failureThreshold: number;
recoveryTimeout: number; // 恢复超时(秒)
monitoringPeriod: number; // 监控周期(秒)
}
// 请求转换
interface RequestTransform {
headers?: Record<string, string>;
queryParams?: Record<string, string>;
body?: Record<string, any>;
}
// 响应转换
interface ResponseTransform {
headers?: Record<string, string>;
body?: Record<string, any>;
}
// 查询接口
interface RouteQuery {
path?: string;
method?: HttpMethod;
serviceId?: string;
status?: RouteStatus;
sortBy?: string;
sortOrder?: 'asc' | 'desc';
}
interface ServiceQuery {
name?: string;
status?: ServiceStatus;
tags?: string[];
}
interface MetricsQuery {
startTime?: Date;
endTime?: Date;
serviceId?: string;
routeId?: string;
}
// API指标
interface ApiMetrics {
requests: {
total: number;
successful: number;
failed: number;
rate: number; // 请求/秒
};
latency: {
p50: number;
p95: number;
p99: number;
average: number;
};
errors: {
total: number;
rate: number; // 错误/秒
byStatusCode: Record<number, number>;
};
services: Record<string, ServiceMetrics>;
routes: Record<string, RouteMetrics>;
}
// 服务指标
interface ServiceMetrics {
requests: number;
errors: number;
latency: number;
availability: number;
}
// 路由指标
interface RouteMetrics {
requests: number;
errors: number;
latency: number;
}
// 健康状态
interface HealthStatus {
status: 'healthy' | 'degraded' | 'unhealthy';
services: {
total: number;
healthy: number;
unhealthy: number;
};
routes: {
total: number;
active: number;
inactive: number;
};
timestamp: Date;
}
13.3 请求路由器
13.3.1 路由器接口和实现
// 请求路由器接口
interface RequestRouter {
addRoute(route: Route): void;
updateRoute(route: Route): void;
removeRoute(routeId: string): void;
findRoute(path: string, method: string): Route | null;
getRoutes(): Route[];
}
// 路由节点
interface RouteNode {
segment: string;
isParam: boolean;
paramName?: string;
children: Map<string, RouteNode>;
routes: Map<string, Route>; // method -> route
}
// 请求路由器实现
class RequestRouter implements RequestRouter {
private root: RouteNode;
constructor() {
this.root = {
segment: '',
isParam: false,
children: new Map(),
routes: new Map()
};
}
// 添加路由
addRoute(route: Route): void {
const segments = this.parsePathSegments(route.path);
let currentNode = this.root;
// 构建路由树
for (const segment of segments) {
const { key, isParam, paramName } = this.parseSegment(segment);
if (!currentNode.children.has(key)) {
currentNode.children.set(key, {
segment: key,
isParam,
paramName,
children: new Map(),
routes: new Map()
});
}
currentNode = currentNode.children.get(key)!;
}
// 为每个HTTP方法添加路由
for (const method of route.methods) {
currentNode.routes.set(method, route);
}
}
// 更新路由
updateRoute(route: Route): void {
this.removeRoute(route.id);
this.addRoute(route);
}
// 移除路由
removeRoute(routeId: string): void {
this.removeRouteFromNode(this.root, routeId);
}
// 查找路由
findRoute(path: string, method: string): Route | null {
const segments = this.parsePathSegments(path);
const result = this.findRouteInNode(this.root, segments, 0, new Map());
if (result && result.node.routes.has(method)) {
const route = result.node.routes.get(method)!;
// 设置路径参数
route.pathParams = Object.fromEntries(result.params);
return route;
}
return null;
}
// 获取所有路由
getRoutes(): Route[] {
const routes: Route[] = [];
this.collectRoutes(this.root, routes);
return routes;
}
// 解析路径段
private parsePathSegments(path: string): string[] {
return path.split('/').filter(segment => segment.length > 0);
}
// 解析段
private parseSegment(segment: string): {
key: string;
isParam: boolean;
paramName?: string;
} {
if (segment.startsWith('{') && segment.endsWith('}')) {
const paramName = segment.slice(1, -1);
return {
key: ':param',
isParam: true,
paramName
};
}
return {
key: segment,
isParam: false
};
}
// 在节点中查找路由
private findRouteInNode(
node: RouteNode,
segments: string[],
index: number,
params: Map<string, string>
): { node: RouteNode; params: Map<string, string> } | null {
// 如果已经匹配完所有段,返回当前节点
if (index >= segments.length) {
return { node, params };
}
const segment = segments[index];
// 尝试精确匹配
if (node.children.has(segment)) {
const result = this.findRouteInNode(
node.children.get(segment)!,
segments,
index + 1,
params
);
if (result) {
return result;
}
}
// 尝试参数匹配
if (node.children.has(':param')) {
const paramNode = node.children.get(':param')!;
const newParams = new Map(params);
if (paramNode.paramName) {
newParams.set(paramNode.paramName, segment);
}
const result = this.findRouteInNode(
paramNode,
segments,
index + 1,
newParams
);
if (result) {
return result;
}
}
return null;
}
// 从节点中移除路由
private removeRouteFromNode(node: RouteNode, routeId: string): boolean {
// 检查当前节点的路由
for (const [method, route] of node.routes) {
if (route.id === routeId) {
node.routes.delete(method);
return true;
}
}
// 递归检查子节点
for (const child of node.children.values()) {
if (this.removeRouteFromNode(child, routeId)) {
// 如果子节点为空,删除它
if (child.routes.size === 0 && child.children.size === 0) {
node.children.delete(child.segment);
}
return true;
}
}
return false;
}
// 收集所有路由
private collectRoutes(node: RouteNode, routes: Route[]): void {
// 添加当前节点的路由
for (const route of node.routes.values()) {
if (!routes.find(r => r.id === route.id)) {
routes.push(route);
}
}
// 递归收集子节点的路由
for (const child of node.children.values()) {
this.collectRoutes(child, routes);
}
}
}
13.4 负载均衡器
13.4.1 负载均衡器接口和实现
// 负载均衡器接口
interface LoadBalancer {
addService(service: Service): void;
updateService(service: Service): void;
removeService(serviceId: string): void;
selectInstance(service: Service): ServiceInstance | null;
getServiceInstances(serviceId: string): ServiceInstance[];
}
// 轮询负载均衡器
class RoundRobinLoadBalancer implements LoadBalancer {
private serviceCounters: Map<string, number> = new Map();
// 添加服务
addService(service: Service): void {
this.serviceCounters.set(service.id, 0);
}
// 更新服务
updateService(service: Service): void {
// 轮询负载均衡器不需要特殊的更新逻辑
}
// 移除服务
removeService(serviceId: string): void {
this.serviceCounters.delete(serviceId);
}
// 选择实例
selectInstance(service: Service): ServiceInstance | null {
const healthyInstances = service.instances.filter(
instance => instance.status === InstanceStatus.HEALTHY
);
if (healthyInstances.length === 0) {
return null;
}
const counter = this.serviceCounters.get(service.id) || 0;
const selectedInstance = healthyInstances[counter % healthyInstances.length];
this.serviceCounters.set(service.id, counter + 1);
return selectedInstance;
}
// 获取服务实例
getServiceInstances(serviceId: string): ServiceInstance[] {
// 这个方法需要从服务注册表中获取实例
// 这里简化实现
return [];
}
}
// 加权轮询负载均衡器
class WeightedRoundRobinLoadBalancer implements LoadBalancer {
private serviceWeights: Map<string, Map<string, number>> = new Map();
private serviceCounters: Map<string, Map<string, number>> = new Map();
// 添加服务
addService(service: Service): void {
const weights = new Map<string, number>();
const counters = new Map<string, number>();
for (const instance of service.instances) {
weights.set(instance.id, instance.weight || 1);
counters.set(instance.id, 0);
}
this.serviceWeights.set(service.id, weights);
this.serviceCounters.set(service.id, counters);
}
// 更新服务
updateService(service: Service): void {
this.removeService(service.id);
this.addService(service);
}
// 移除服务
removeService(serviceId: string): void {
this.serviceWeights.delete(serviceId);
this.serviceCounters.delete(serviceId);
}
// 选择实例
selectInstance(service: Service): ServiceInstance | null {
const healthyInstances = service.instances.filter(
instance => instance.status === InstanceStatus.HEALTHY
);
if (healthyInstances.length === 0) {
return null;
}
const weights = this.serviceWeights.get(service.id);
const counters = this.serviceCounters.get(service.id);
if (!weights || !counters) {
return healthyInstances[0];
}
// 计算当前权重
let selectedInstance: ServiceInstance | null = null;
let maxCurrentWeight = -1;
for (const instance of healthyInstances) {
const weight = weights.get(instance.id) || 1;
const counter = counters.get(instance.id) || 0;
const currentWeight = counter + weight;
counters.set(instance.id, currentWeight);
if (currentWeight > maxCurrentWeight) {
maxCurrentWeight = currentWeight;
selectedInstance = instance;
}
}
// 减少选中实例的权重
if (selectedInstance) {
const totalWeight = Array.from(weights.values()).reduce((sum, w) => sum + w, 0);
const currentWeight = counters.get(selectedInstance.id) || 0;
counters.set(selectedInstance.id, currentWeight - totalWeight);
}
return selectedInstance;
}
// 获取服务实例
getServiceInstances(serviceId: string): ServiceInstance[] {
return [];
}
}
// 最少连接负载均衡器
class LeastConnectionsLoadBalancer implements LoadBalancer {
private instanceConnections: Map<string, number> = new Map();
// 添加服务
addService(service: Service): void {
for (const instance of service.instances) {
this.instanceConnections.set(instance.id, 0);
}
}
// 更新服务
updateService(service: Service): void {
// 保留现有连接计数,只添加新实例
for (const instance of service.instances) {
if (!this.instanceConnections.has(instance.id)) {
this.instanceConnections.set(instance.id, 0);
}
}
}
// 移除服务
removeService(serviceId: string): void {
// 需要知道哪些实例属于这个服务
// 这里简化实现
}
// 选择实例
selectInstance(service: Service): ServiceInstance | null {
const healthyInstances = service.instances.filter(
instance => instance.status === InstanceStatus.HEALTHY
);
if (healthyInstances.length === 0) {
return null;
}
// 选择连接数最少的实例
let selectedInstance = healthyInstances[0];
let minConnections = this.instanceConnections.get(selectedInstance.id) || 0;
for (const instance of healthyInstances) {
const connections = this.instanceConnections.get(instance.id) || 0;
if (connections < minConnections) {
minConnections = connections;
selectedInstance = instance;
}
}
// 增加连接计数
this.instanceConnections.set(
selectedInstance.id,
(this.instanceConnections.get(selectedInstance.id) || 0) + 1
);
return selectedInstance;
}
// 释放连接
releaseConnection(instanceId: string): void {
const connections = this.instanceConnections.get(instanceId) || 0;
this.instanceConnections.set(instanceId, Math.max(0, connections - 1));
}
// 获取服务实例
getServiceInstances(serviceId: string): ServiceInstance[] {
return [];
}
}