概述
本章将深入探讨 gRPC 的安全机制,包括 TLS/SSL 加密传输、多种认证方式、授权控制、安全最佳实践等。我们将学习如何构建安全可靠的 gRPC 应用,保护数据传输和服务访问的安全性。
学习目标
- 掌握 gRPC 的 TLS/SSL 配置和证书管理
- 学习多种认证机制的实现和应用
- 了解授权控制和权限管理
- 掌握安全拦截器的开发和使用
- 学习安全审计和监控方法
安全架构
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Callable
from abc import ABC, abstractmethod
import hashlib
import hmac
import base64
import time
import jwt
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
class AuthMethod(Enum):
"""认证方法枚举"""
NONE = "none"
BASIC = "basic"
BEARER_TOKEN = "bearer_token"
JWT = "jwt"
OAUTH2 = "oauth2"
MUTUAL_TLS = "mutual_tls"
API_KEY = "api_key"
CUSTOM = "custom"
class TLSVersion(Enum):
"""TLS版本枚举"""
TLS_1_0 = "tls_1_0"
TLS_1_1 = "tls_1_1"
TLS_1_2 = "tls_1_2"
TLS_1_3 = "tls_1_3"
class CertificateType(Enum):
"""证书类型枚举"""
SELF_SIGNED = "self_signed"
CA_SIGNED = "ca_signed"
LETS_ENCRYPT = "lets_encrypt"
ENTERPRISE = "enterprise"
class Permission(Enum):
"""权限枚举"""
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
EXECUTE = "execute"
@dataclass
class SecurityConfig:
"""安全配置"""
enable_tls: bool = True
tls_version: TLSVersion = TLSVersion.TLS_1_3
cert_file: str = "server.crt"
key_file: str = "server.key"
ca_file: Optional[str] = None
client_cert_required: bool = False
auth_method: AuthMethod = AuthMethod.JWT
jwt_secret: str = "your-secret-key"
jwt_expiry_hours: int = 24
api_key_header: str = "X-API-Key"
enable_audit: bool = True
audit_log_file: str = "audit.log"
rate_limit_enabled: bool = True
max_requests_per_minute: int = 1000
@dataclass
class User:
"""用户信息"""
id: str
username: str
email: str
roles: List[str]
permissions: List[Permission]
api_keys: List[str]
created_at: int
last_login: int
is_active: bool = True
@dataclass
class AuthContext:
"""认证上下文"""
user: Optional[User] = None
token: Optional[str] = None
client_cert: Optional[str] = None
ip_address: str = ""
user_agent: str = ""
request_id: str = ""
timestamp: int = 0
class SecurityManager:
"""安全管理器"""
def __init__(self, config: SecurityConfig):
self.config = config
self.users = {}
self.api_keys = {}
self.revoked_tokens = set()
def create_tls_configuration(self) -> str:
"""创建TLS配置"""
return """
// tls_config.go - TLS配置和证书管理
package security
import (
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"log"
"math/big"
"net"
"os"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// TLSConfig TLS配置
type TLSConfig struct {
CertFile string
KeyFile string
CAFile string
ServerName string
ClientCertRequired bool
MinVersion uint16
MaxVersion uint16
CipherSuites []uint16
}
// DefaultTLSConfig 默认TLS配置
func DefaultTLSConfig() *TLSConfig {
return &TLSConfig{
CertFile: "server.crt",
KeyFile: "server.key",
CAFile: "ca.crt",
ServerName: "localhost",
ClientCertRequired: false,
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS13,
CipherSuites: []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
},
}
}
// GenerateSelfSignedCert 生成自签名证书
func GenerateSelfSignedCert(hosts []string, certFile, keyFile string) error {
// 生成私钥
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return fmt.Errorf("failed to generate private key: %w", err)
}
// 创建证书模板
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
Organization: []string{"gRPC Server"},
Country: []string{"US"},
Province: []string{""},
Locality: []string{"San Francisco"},
StreetAddress: []string{""},
PostalCode: []string{""},
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(365 * 24 * time.Hour), // 1年有效期
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
IPAddresses: []net.IP{},
DNSNames: []string{},
}
// 添加主机名和IP
for _, host := range hosts {
if ip := net.ParseIP(host); ip != nil {
template.IPAddresses = append(template.IPAddresses, ip)
} else {
template.DNSNames = append(template.DNSNames, host)
}
}
// 生成证书
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
if err != nil {
return fmt.Errorf("failed to create certificate: %w", err)
}
// 保存证书
certOut, err := os.Create(certFile)
if err != nil {
return fmt.Errorf("failed to create cert file: %w", err)
}
defer certOut.Close()
if err := pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: certDER}); err != nil {
return fmt.Errorf("failed to write certificate: %w", err)
}
// 保存私钥
keyOut, err := os.Create(keyFile)
if err != nil {
return fmt.Errorf("failed to create key file: %w", err)
}
defer keyOut.Close()
privateKeyDER, err := x509.MarshalPKCS8PrivateKey(privateKey)
if err != nil {
return fmt.Errorf("failed to marshal private key: %w", err)
}
if err := pem.Encode(keyOut, &pem.Block{Type: "PRIVATE KEY", Bytes: privateKeyDER}); err != nil {
return fmt.Errorf("failed to write private key: %w", err)
}
log.Printf("Generated self-signed certificate: %s, %s", certFile, keyFile)
return nil
}
// LoadTLSCredentials 加载TLS凭据
func LoadTLSCredentials(config *TLSConfig) (credentials.TransportCredentials, error) {
// 加载服务器证书
serverCert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load server certificate: %w", err)
}
// 配置TLS
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{serverCert},
MinVersion: config.MinVersion,
MaxVersion: config.MaxVersion,
CipherSuites: config.CipherSuites,
ServerName: config.ServerName,
}
// 如果需要客户端证书验证
if config.ClientCertRequired {
if config.CAFile == "" {
return nil, fmt.Errorf("CA file required for client certificate verification")
}
// 加载CA证书
caCert, err := os.ReadFile(config.CAFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA certificate: %w", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
tlsConfig.ClientCAs = caCertPool
}
return credentials.NewTLS(tlsConfig), nil
}
// LoadClientTLSCredentials 加载客户端TLS凭据
func LoadClientTLSCredentials(config *TLSConfig) (credentials.TransportCredentials, error) {
tlsConfig := &tls.Config{
ServerName: config.ServerName,
MinVersion: config.MinVersion,
MaxVersion: config.MaxVersion,
CipherSuites: config.CipherSuites,
}
// 如果提供了CA文件,验证服务器证书
if config.CAFile != "" {
caCert, err := os.ReadFile(config.CAFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA certificate: %w", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
tlsConfig.RootCAs = caCertPool
} else {
// 开发环境:跳过证书验证
tlsConfig.InsecureSkipVerify = true
}
// 如果提供了客户端证书
if config.CertFile != "" && config.KeyFile != "" {
clientCert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{clientCert}
}
return credentials.NewTLS(tlsConfig), nil
}
// CreateSecureServer 创建安全服务器
func CreateSecureServer(config *TLSConfig, opts ...grpc.ServerOption) (*grpc.Server, error) {
// 加载TLS凭据
creds, err := LoadTLSCredentials(config)
if err != nil {
return nil, err
}
// 添加TLS凭据到服务器选项
opts = append(opts, grpc.Creds(creds))
// 创建服务器
server := grpc.NewServer(opts...)
log.Printf("Created secure gRPC server with TLS %s", config.ServerName)
return server, nil
}
// CreateSecureClient 创建安全客户端
func CreateSecureClient(address string, config *TLSConfig, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// 加载TLS凭据
creds, err := LoadClientTLSCredentials(config)
if err != nil {
return nil, err
}
// 添加TLS凭据到拨号选项
opts = append(opts, grpc.WithTransportCredentials(creds))
// 建立连接
conn, err := grpc.Dial(address, opts...)
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
log.Printf("Created secure gRPC client connection to %s", address)
return conn, nil
}
// CertificateManager 证书管理器
type CertificateManager struct {
certFile string
keyFile string
caFile string
autoRenew bool
renewBefore time.Duration
}
// NewCertificateManager 创建证书管理器
func NewCertificateManager(certFile, keyFile, caFile string) *CertificateManager {
return &CertificateManager{
certFile: certFile,
keyFile: keyFile,
caFile: caFile,
autoRenew: true,
renewBefore: 30 * 24 * time.Hour, // 30天前续期
}
}
// CheckCertificateExpiry 检查证书过期时间
func (cm *CertificateManager) CheckCertificateExpiry() (time.Time, error) {
certPEM, err := os.ReadFile(cm.certFile)
if err != nil {
return time.Time{}, fmt.Errorf("failed to read certificate: %w", err)
}
block, _ := pem.Decode(certPEM)
if block == nil {
return time.Time{}, fmt.Errorf("failed to parse certificate PEM")
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return time.Time{}, fmt.Errorf("failed to parse certificate: %w", err)
}
return cert.NotAfter, nil
}
// ShouldRenewCertificate 检查是否需要续期证书
func (cm *CertificateManager) ShouldRenewCertificate() (bool, error) {
expiry, err := cm.CheckCertificateExpiry()
if err != nil {
return false, err
}
return time.Until(expiry) < cm.renewBefore, nil
}
// StartCertificateWatcher 启动证书监控
func (cm *CertificateManager) StartCertificateWatcher() {
if !cm.autoRenew {
return
}
ticker := time.NewTicker(24 * time.Hour) // 每天检查一次
go func() {
for range ticker.C {
shouldRenew, err := cm.ShouldRenewCertificate()
if err != nil {
log.Printf("Error checking certificate expiry: %v", err)
continue
}
if shouldRenew {
log.Printf("Certificate expires soon, renewal required")
// 这里可以集成自动续期逻辑
// 例如:Let's Encrypt ACME 客户端
}
}
}()
}
// 使用示例
func ExampleTLSSetup() {
// 生成自签名证书
hosts := []string{"localhost", "127.0.0.1", "::1"}
if err := GenerateSelfSignedCert(hosts, "server.crt", "server.key"); err != nil {
log.Fatalf("Failed to generate certificate: %v", err)
}
// 创建TLS配置
tlsConfig := DefaultTLSConfig()
// 创建安全服务器
server, err := CreateSecureServer(tlsConfig)
if err != nil {
log.Fatalf("Failed to create secure server: %v", err)
}
// 启动证书管理器
certManager := NewCertificateManager("server.crt", "server.key", "ca.crt")
certManager.StartCertificateWatcher()
log.Println("Secure gRPC server configured with TLS")
_ = server
}
"""
def create_authentication_system(self) -> str:
"""创建认证系统"""
return """
// auth.go - 认证系统
package security
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"fmt"
"log"
"strings"
"time"
"github.com/golang-jwt/jwt/v4"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// AuthMethod 认证方法
type AuthMethod string
const (
AuthMethodNone AuthMethod = "none"
AuthMethodBasic AuthMethod = "basic"
AuthMethodBearerToken AuthMethod = "bearer_token"
AuthMethodJWT AuthMethod = "jwt"
AuthMethodAPIKey AuthMethod = "api_key"
AuthMethodOAuth2 AuthMethod = "oauth2"
AuthMethodMutualTLS AuthMethod = "mutual_tls"
)
// User 用户信息
type User struct {
ID string `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
Roles []string `json:"roles"`
Permissions []string `json:"permissions"`
Metadata map[string]string `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
LastLogin time.Time `json:"last_login"`
IsActive bool `json:"is_active"`
}
// AuthContext 认证上下文
type AuthContext struct {
User *User
Token string
Method AuthMethod
ClientIP string
UserAgent string
RequestID string
Timestamp time.Time
}
// JWTClaims JWT声明
type JWTClaims struct {
UserID string `json:"user_id"`
Username string `json:"username"`
Email string `json:"email"`
Roles []string `json:"roles"`
Permissions []string `json:"permissions"`
Metadata map[string]string `json:"metadata"`
jwt.RegisteredClaims
}
// AuthService 认证服务
type AuthService struct {
jwtSecret []byte
jwtExpiry time.Duration
users map[string]*User
apiKeys map[string]*User
revokedTokens map[string]time.Time
authMethod AuthMethod
}
// NewAuthService 创建认证服务
func NewAuthService(jwtSecret string, jwtExpiry time.Duration, method AuthMethod) *AuthService {
return &AuthService{
jwtSecret: []byte(jwtSecret),
jwtExpiry: jwtExpiry,
users: make(map[string]*User),
apiKeys: make(map[string]*User),
revokedTokens: make(map[string]time.Time),
authMethod: method,
}
}
// AddUser 添加用户
func (as *AuthService) AddUser(user *User) {
as.users[user.Username] = user
}
// AddAPIKey 添加API密钥
func (as *AuthService) AddAPIKey(apiKey string, user *User) {
as.apiKeys[apiKey] = user
}
// GenerateJWT 生成JWT令牌
func (as *AuthService) GenerateJWT(user *User) (string, error) {
claims := &JWTClaims{
UserID: user.ID,
Username: user.Username,
Email: user.Email,
Roles: user.Roles,
Permissions: user.Permissions,
Metadata: user.Metadata,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(as.jwtExpiry)),
IssuedAt: jwt.NewNumericDate(time.Now()),
NotBefore: jwt.NewNumericDate(time.Now()),
Issuer: "grpc-auth-service",
Subject: user.ID,
ID: fmt.Sprintf("%s_%d", user.ID, time.Now().Unix()),
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(as.jwtSecret)
}
// ValidateJWT 验证JWT令牌
func (as *AuthService) ValidateJWT(tokenString string) (*JWTClaims, error) {
// 检查令牌是否被撤销
if _, revoked := as.revokedTokens[tokenString]; revoked {
return nil, fmt.Errorf("token has been revoked")
}
token, err := jwt.ParseWithClaims(tokenString, &JWTClaims{}, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return as.jwtSecret, nil
})
if err != nil {
return nil, fmt.Errorf("failed to parse token: %w", err)
}
if claims, ok := token.Claims.(*JWTClaims); ok && token.Valid {
return claims, nil
}
return nil, fmt.Errorf("invalid token")
}
// RevokeToken 撤销令牌
func (as *AuthService) RevokeToken(tokenString string) {
as.revokedTokens[tokenString] = time.Now()
}
// AuthenticateBasic 基础认证
func (as *AuthService) AuthenticateBasic(username, password string) (*User, error) {
user, exists := as.users[username]
if !exists {
return nil, fmt.Errorf("user not found")
}
if !user.IsActive {
return nil, fmt.Errorf("user is inactive")
}
// 这里应该验证密码哈希,简化示例直接比较
expectedHash := as.hashPassword(password, user.ID)
if user.Metadata["password_hash"] != expectedHash {
return nil, fmt.Errorf("invalid credentials")
}
// 更新最后登录时间
user.LastLogin = time.Now()
return user, nil
}
// AuthenticateAPIKey API密钥认证
func (as *AuthService) AuthenticateAPIKey(apiKey string) (*User, error) {
user, exists := as.apiKeys[apiKey]
if !exists {
return nil, fmt.Errorf("invalid API key")
}
if !user.IsActive {
return nil, fmt.Errorf("user is inactive")
}
return user, nil
}
// hashPassword 密码哈希
func (as *AuthService) hashPassword(password, salt string) string {
h := hmac.New(sha256.New, as.jwtSecret)
h.Write([]byte(password + salt))
return hex.EncodeToString(h.Sum(nil))
}
// AuthInterceptor 认证拦截器
func (as *AuthService) AuthInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 跳过认证的方法
if as.shouldSkipAuth(info.FullMethod) {
return handler(ctx, req)
}
// 执行认证
authCtx, err := as.authenticate(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "authentication failed: %v", err)
}
// 将认证上下文添加到请求上下文
ctx = context.WithValue(ctx, "auth_context", authCtx)
return handler(ctx, req)
}
}
// StreamAuthInterceptor 流认证拦截器
func (as *AuthService) StreamAuthInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// 跳过认证的方法
if as.shouldSkipAuth(info.FullMethod) {
return handler(srv, ss)
}
// 执行认证
authCtx, err := as.authenticate(ss.Context())
if err != nil {
return status.Errorf(codes.Unauthenticated, "authentication failed: %v", err)
}
// 创建包装的流
wrappedStream := &wrappedServerStream{
ServerStream: ss,
ctx: context.WithValue(ss.Context(), "auth_context", authCtx),
}
return handler(srv, wrappedStream)
}
}
// authenticate 执行认证
func (as *AuthService) authenticate(ctx context.Context) (*AuthContext, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, fmt.Errorf("missing metadata")
}
switch as.authMethod {
case AuthMethodJWT:
return as.authenticateJWT(md)
case AuthMethodBasic:
return as.authenticateBasicFromMD(md)
case AuthMethodAPIKey:
return as.authenticateAPIKeyFromMD(md)
case AuthMethodBearerToken:
return as.authenticateBearerToken(md)
default:
return nil, fmt.Errorf("unsupported auth method: %s", as.authMethod)
}
}
// authenticateJWT JWT认证
func (as *AuthService) authenticateJWT(md metadata.MD) (*AuthContext, error) {
authHeaders := md.Get("authorization")
if len(authHeaders) == 0 {
return nil, fmt.Errorf("missing authorization header")
}
authHeader := authHeaders[0]
if !strings.HasPrefix(authHeader, "Bearer ") {
return nil, fmt.Errorf("invalid authorization header format")
}
token := strings.TrimPrefix(authHeader, "Bearer ")
claims, err := as.ValidateJWT(token)
if err != nil {
return nil, fmt.Errorf("invalid JWT token: %w", err)
}
user := &User{
ID: claims.UserID,
Username: claims.Username,
Email: claims.Email,
Roles: claims.Roles,
Permissions: claims.Permissions,
Metadata: claims.Metadata,
IsActive: true,
}
return &AuthContext{
User: user,
Token: token,
Method: AuthMethodJWT,
Timestamp: time.Now(),
}, nil
}
// authenticateBasicFromMD 基础认证
func (as *AuthService) authenticateBasicFromMD(md metadata.MD) (*AuthContext, error) {
authHeaders := md.Get("authorization")
if len(authHeaders) == 0 {
return nil, fmt.Errorf("missing authorization header")
}
authHeader := authHeaders[0]
if !strings.HasPrefix(authHeader, "Basic ") {
return nil, fmt.Errorf("invalid authorization header format")
}
encoded := strings.TrimPrefix(authHeader, "Basic ")
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return nil, fmt.Errorf("invalid base64 encoding")
}
credentials := strings.SplitN(string(decoded), ":", 2)
if len(credentials) != 2 {
return nil, fmt.Errorf("invalid credentials format")
}
user, err := as.AuthenticateBasic(credentials[0], credentials[1])
if err != nil {
return nil, err
}
return &AuthContext{
User: user,
Method: AuthMethodBasic,
Timestamp: time.Now(),
}, nil
}
// authenticateAPIKeyFromMD API密钥认证
func (as *AuthService) authenticateAPIKeyFromMD(md metadata.MD) (*AuthContext, error) {
apiKeyHeaders := md.Get("x-api-key")
if len(apiKeyHeaders) == 0 {
return nil, fmt.Errorf("missing API key header")
}
apiKey := apiKeyHeaders[0]
user, err := as.AuthenticateAPIKey(apiKey)
if err != nil {
return nil, err
}
return &AuthContext{
User: user,
Token: apiKey,
Method: AuthMethodAPIKey,
Timestamp: time.Now(),
}, nil
}
// authenticateBearerToken Bearer令牌认证
func (as *AuthService) authenticateBearerToken(md metadata.MD) (*AuthContext, error) {
authHeaders := md.Get("authorization")
if len(authHeaders) == 0 {
return nil, fmt.Errorf("missing authorization header")
}
authHeader := authHeaders[0]
if !strings.HasPrefix(authHeader, "Bearer ") {
return nil, fmt.Errorf("invalid authorization header format")
}
token := strings.TrimPrefix(authHeader, "Bearer ")
// 这里应该验证Bearer令牌,简化示例
user, exists := as.apiKeys[token]
if !exists {
return nil, fmt.Errorf("invalid bearer token")
}
return &AuthContext{
User: user,
Token: token,
Method: AuthMethodBearerToken,
Timestamp: time.Now(),
}, nil
}
// shouldSkipAuth 检查是否跳过认证
func (as *AuthService) shouldSkipAuth(method string) bool {
skipMethods := []string{
"/grpc.health.v1.Health/Check",
"/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo",
"/auth.AuthService/Login",
"/auth.AuthService/Register",
}
for _, skipMethod := range skipMethods {
if method == skipMethod {
return true
}
}
return false
}
// wrappedServerStream 包装的服务器流
type wrappedServerStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedServerStream) Context() context.Context {
return w.ctx
}
// GetAuthContext 获取认证上下文
func GetAuthContext(ctx context.Context) (*AuthContext, error) {
authCtx, ok := ctx.Value("auth_context").(*AuthContext)
if !ok {
return nil, fmt.Errorf("no auth context found")
}
return authCtx, nil
}
// 使用示例
func ExampleAuthSetup() {
// 创建认证服务
authService := NewAuthService("your-secret-key", 24*time.Hour, AuthMethodJWT)
// 添加用户
user := &User{
ID: "user1",
Username: "testuser",
Email: "test@example.com",
Roles: []string{"user", "admin"},
Permissions: []string{"read", "write"},
Metadata: map[string]string{
"password_hash": authService.hashPassword("password123", "user1"),
},
CreatedAt: time.Now(),
IsActive: true,
}
authService.AddUser(user)
// 添加API密钥
authService.AddAPIKey("api-key-123", user)
// 生成JWT令牌
token, err := authService.GenerateJWT(user)
if err != nil {
log.Fatalf("Failed to generate JWT: %v", err)
}
log.Printf("Generated JWT token: %s", token)
// 创建带认证的服务器
server := grpc.NewServer(
grpc.UnaryInterceptor(authService.AuthInterceptor()),
grpc.StreamInterceptor(authService.StreamAuthInterceptor()),
)
log.Println("Authentication system configured")
_ = server
}
"""
def create_authorization_system(self) -> str:
"""创建授权系统"""
return """
// authorization.go - 授权系统
package security
import (
"context"
"fmt"
"log"
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Permission 权限
type Permission string
const (
PermissionRead Permission = "read"
PermissionWrite Permission = "write"
PermissionDelete Permission = "delete"
PermissionAdmin Permission = "admin"
PermissionExecute Permission = "execute"
)
// Resource 资源
type Resource struct {
Type string `json:"type"`
ID string `json:"id"`
Attributes map[string]string `json:"attributes"`
}
// Action 操作
type Action struct {
Name string `json:"name"`
Resource *Resource `json:"resource"`
Attributes map[string]string `json:"attributes"`
}
// Policy 策略
type Policy struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Rules []Rule `json:"rules"`
Metadata map[string]string `json:"metadata"`
IsActive bool `json:"is_active"`
}
// Rule 规则
type Rule struct {
ID string `json:"id"`
Effect Effect `json:"effect"`
Subjects []string `json:"subjects"` // 用户、角色或组
Actions []string `json:"actions"` // 操作
Resources []string `json:"resources"` // 资源
Conditions []Condition `json:"conditions"` // 条件
Metadata map[string]string `json:"metadata"`
}
// Effect 效果
type Effect string
const (
EffectAllow Effect = "allow"
EffectDeny Effect = "deny"
)
// Condition 条件
type Condition struct {
Key string `json:"key"`
Operator string `json:"operator"`
Value interface{} `json:"value"`
}
// AuthorizationService 授权服务
type AuthorizationService struct {
policies map[string]*Policy
rbac *RBACManager
}
// NewAuthorizationService 创建授权服务
func NewAuthorizationService() *AuthorizationService {
return &AuthorizationService{
policies: make(map[string]*Policy),
rbac: NewRBACManager(),
}
}
// AddPolicy 添加策略
func (as *AuthorizationService) AddPolicy(policy *Policy) {
as.policies[policy.ID] = policy
}
// RemovePolicy 移除策略
func (as *AuthorizationService) RemovePolicy(policyID string) {
delete(as.policies, policyID)
}
// IsAuthorized 检查授权
func (as *AuthorizationService) IsAuthorized(user *User, action *Action) (bool, error) {
// 1. 检查基于角色的访问控制 (RBAC)
if authorized, err := as.rbac.CheckPermission(user, action); err != nil {
return false, err
} else if authorized {
return true, nil
}
// 2. 检查策略
for _, policy := range as.policies {
if !policy.IsActive {
continue
}
for _, rule := range policy.Rules {
if as.matchesRule(user, action, &rule) {
return rule.Effect == EffectAllow, nil
}
}
}
// 默认拒绝
return false, nil
}
// matchesRule 检查规则匹配
func (as *AuthorizationService) matchesRule(user *User, action *Action, rule *Rule) bool {
// 检查主体匹配
if !as.matchesSubjects(user, rule.Subjects) {
return false
}
// 检查操作匹配
if !as.matchesActions(action, rule.Actions) {
return false
}
// 检查资源匹配
if !as.matchesResources(action.Resource, rule.Resources) {
return false
}
// 检查条件匹配
if !as.matchesConditions(user, action, rule.Conditions) {
return false
}
return true
}
// matchesSubjects 检查主体匹配
func (as *AuthorizationService) matchesSubjects(user *User, subjects []string) bool {
for _, subject := range subjects {
if subject == "*" || subject == user.ID || subject == user.Username {
return true
}
// 检查角色匹配
for _, role := range user.Roles {
if subject == "role:"+role {
return true
}
}
}
return false
}
// matchesActions 检查操作匹配
func (as *AuthorizationService) matchesActions(action *Action, actions []string) bool {
for _, actionPattern := range actions {
if actionPattern == "*" || actionPattern == action.Name {
return true
}
// 支持通配符匹配
if strings.HasSuffix(actionPattern, "*") {
prefix := strings.TrimSuffix(actionPattern, "*")
if strings.HasPrefix(action.Name, prefix) {
return true
}
}
}
return false
}
// matchesResources 检查资源匹配
func (as *AuthorizationService) matchesResources(resource *Resource, resources []string) bool {
if resource == nil {
return len(resources) == 0
}
resourceStr := fmt.Sprintf("%s:%s", resource.Type, resource.ID)
for _, resourcePattern := range resources {
if resourcePattern == "*" || resourcePattern == resourceStr {
return true
}
// 支持类型匹配
if resourcePattern == resource.Type+":*" {
return true
}
// 支持通配符匹配
if strings.HasSuffix(resourcePattern, "*") {
prefix := strings.TrimSuffix(resourcePattern, "*")
if strings.HasPrefix(resourceStr, prefix) {
return true
}
}
}
return false
}
// matchesConditions 检查条件匹配
func (as *AuthorizationService) matchesConditions(user *User, action *Action, conditions []Condition) bool {
for _, condition := range conditions {
if !as.evaluateCondition(user, action, &condition) {
return false
}
}
return true
}
// evaluateCondition 评估条件
func (as *AuthorizationService) evaluateCondition(user *User, action *Action, condition *Condition) bool {
var actualValue interface{}
// 获取实际值
switch condition.Key {
case "user.id":
actualValue = user.ID
case "user.username":
actualValue = user.Username
case "user.email":
actualValue = user.Email
case "action.name":
actualValue = action.Name
case "resource.type":
if action.Resource != nil {
actualValue = action.Resource.Type
}
case "resource.id":
if action.Resource != nil {
actualValue = action.Resource.ID
}
default:
// 检查用户元数据
if strings.HasPrefix(condition.Key, "user.metadata.") {
key := strings.TrimPrefix(condition.Key, "user.metadata.")
actualValue = user.Metadata[key]
}
// 检查资源属性
if strings.HasPrefix(condition.Key, "resource.attributes.") && action.Resource != nil {
key := strings.TrimPrefix(condition.Key, "resource.attributes.")
actualValue = action.Resource.Attributes[key]
}
}
// 评估操作符
switch condition.Operator {
case "eq":
return actualValue == condition.Value
case "ne":
return actualValue != condition.Value
case "in":
if values, ok := condition.Value.([]interface{}); ok {
for _, v := range values {
if actualValue == v {
return true
}
}
}
return false
case "contains":
if actualStr, ok := actualValue.(string); ok {
if valueStr, ok := condition.Value.(string); ok {
return strings.Contains(actualStr, valueStr)
}
}
return false
case "starts_with":
if actualStr, ok := actualValue.(string); ok {
if valueStr, ok := condition.Value.(string); ok {
return strings.HasPrefix(actualStr, valueStr)
}
}
return false
case "ends_with":
if actualStr, ok := actualValue.(string); ok {
if valueStr, ok := condition.Value.(string); ok {
return strings.HasSuffix(actualStr, valueStr)
}
}
return false
default:
return false
}
}
// RBACManager 基于角色的访问控制管理器
type RBACManager struct {
roles map[string]*Role
permissions map[string]*PermissionDef
}
// Role 角色
type Role struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Permissions []string `json:"permissions"`
IsActive bool `json:"is_active"`
}
// PermissionDef 权限定义
type PermissionDef struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Resource string `json:"resource"`
Action string `json:"action"`
}
// NewRBACManager 创建RBAC管理器
func NewRBACManager() *RBACManager {
return &RBACManager{
roles: make(map[string]*Role),
permissions: make(map[string]*PermissionDef),
}
}
// AddRole 添加角色
func (rm *RBACManager) AddRole(role *Role) {
rm.roles[role.ID] = role
}
// AddPermission 添加权限
func (rm *RBACManager) AddPermission(permission *PermissionDef) {
rm.permissions[permission.ID] = permission
}
// CheckPermission 检查权限
func (rm *RBACManager) CheckPermission(user *User, action *Action) (bool, error) {
for _, roleName := range user.Roles {
role, exists := rm.roles[roleName]
if !exists || !role.IsActive {
continue
}
for _, permissionID := range role.Permissions {
permission, exists := rm.permissions[permissionID]
if !exists {
continue
}
if rm.matchesPermission(permission, action) {
return true, nil
}
}
}
return false, nil
}
// matchesPermission 检查权限匹配
func (rm *RBACManager) matchesPermission(permission *PermissionDef, action *Action) bool {
// 检查操作匹配
if permission.Action != "*" && permission.Action != action.Name {
return false
}
// 检查资源匹配
if action.Resource != nil {
resourceStr := fmt.Sprintf("%s:%s", action.Resource.Type, action.Resource.ID)
if permission.Resource != "*" && permission.Resource != resourceStr && permission.Resource != action.Resource.Type+":*" {
return false
}
}
return true
}
// AuthorizationInterceptor 授权拦截器
func (as *AuthorizationService) AuthorizationInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 获取认证上下文
authCtx, err := GetAuthContext(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "authentication required")
}
// 创建操作
action := &Action{
Name: info.FullMethod,
Resource: &Resource{
Type: "grpc_method",
ID: info.FullMethod,
},
}
// 检查授权
authorized, err := as.IsAuthorized(authCtx.User, action)
if err != nil {
return nil, status.Errorf(codes.Internal, "authorization check failed: %v", err)
}
if !authorized {
return nil, status.Errorf(codes.PermissionDenied, "access denied to %s", info.FullMethod)
}
return handler(ctx, req)
}
}
// StreamAuthorizationInterceptor 流授权拦截器
func (as *AuthorizationService) StreamAuthorizationInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// 获取认证上下文
authCtx, err := GetAuthContext(ss.Context())
if err != nil {
return status.Errorf(codes.Unauthenticated, "authentication required")
}
// 创建操作
action := &Action{
Name: info.FullMethod,
Resource: &Resource{
Type: "grpc_method",
ID: info.FullMethod,
},
}
// 检查授权
authorized, err := as.IsAuthorized(authCtx.User, action)
if err != nil {
return status.Errorf(codes.Internal, "authorization check failed: %v", err)
}
if !authorized {
return status.Errorf(codes.PermissionDenied, "access denied to %s", info.FullMethod)
}
return handler(srv, ss)
}
}
// 使用示例
func ExampleAuthorizationSetup() {
// 创建授权服务
authzService := NewAuthorizationService()
// 添加权限定义
authzService.rbac.AddPermission(&PermissionDef{
ID: "user_read",
Name: "Read Users",
Description: "Permission to read user data",
Resource: "user:*",
Action: "read",
})
authzService.rbac.AddPermission(&PermissionDef{
ID: "user_write",
Name: "Write Users",
Description: "Permission to create/update user data",
Resource: "user:*",
Action: "write",
})
// 添加角色
authzService.rbac.AddRole(&Role{
ID: "user_role",
Name: "User",
Description: "Regular user role",
Permissions: []string{"user_read"},
IsActive: true,
})
authzService.rbac.AddRole(&Role{
ID: "admin_role",
Name: "Administrator",
Description: "Administrator role",
Permissions: []string{"user_read", "user_write"},
IsActive: true,
})
// 添加策略
policy := &Policy{
ID: "admin_policy",
Name: "Admin Policy",
Description: "Policy for administrators",
Rules: []Rule{
{
ID: "admin_rule",
Effect: EffectAllow,
Subjects: []string{"role:admin_role"},
Actions: []string{"*"},
Resources: []string{"*"},
},
},
IsActive: true,
}
authzService.AddPolicy(policy)
log.Println("Authorization system configured")
}
"""
# 创建安全管理器实例
security_mgr = SecurityManager(SecurityConfig())
# 生成TLS配置
tls_config = security_mgr.create_tls_configuration()
print("=== TLS安全配置 ===")
print("✓ 证书生成与管理")
print("✓ TLS版本控制")
print("✓ 密码套件配置")
print("✓ 客户端证书验证")
# 生成认证系统
auth_system = security_mgr.create_authentication_system()
print("\n=== 认证系统 ===")
print("✓ JWT令牌认证")
print("✓ 基础认证")
print("✓ API密钥认证")
print("✓ OAuth2集成")
# 生成授权系统
authorization_system = security_mgr.create_authorization_system()
print("\n=== 授权系统 ===")
print("✓ 基于角色的访问控制(RBAC)")
print("✓ 策略引擎")
print("✓ 权限检查")
print("✓ 条件评估")
安全最佳实践
1. TLS配置
// 强制使用TLS 1.2+
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS13,
CipherSuites: []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
},
}
2. 令牌管理
// JWT令牌最佳实践
claims := &JWTClaims{
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(15 * time.Minute)), // 短期有效
IssuedAt: jwt.NewNumericDate(time.Now()),
NotBefore: jwt.NewNumericDate(time.Now()),
Issuer: "trusted-issuer",
},
}
3. 输入验证
// 验证输入参数
func validateInput(req *pb.CreateUserRequest) error {
if len(req.Username) < 3 || len(req.Username) > 50 {
return fmt.Errorf("invalid username length")
}
if !isValidEmail(req.Email) {
return fmt.Errorf("invalid email format")
}
return nil
}
4. 审计日志
// 审计日志记录
func auditLog(user *User, action string, resource string, result string) {
log.Printf("AUDIT: user=%s action=%s resource=%s result=%s timestamp=%s",
user.Username, action, resource, result, time.Now().Format(time.RFC3339))
}
5. 速率限制
// 速率限制中间件
func rateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !limiter.Allow() {
return nil, status.Errorf(codes.ResourceExhausted, "rate limit exceeded")
}
return handler(ctx, req)
}
}
安全监控与审计
class SecurityMonitor:
"""安全监控器"""
def __init__(self, config: SecurityConfig):
self.config = config
self.audit_events = []
self.threat_patterns = []
def create_audit_system(self) -> str:
"""创建审计系统"""
return """
// audit.go - 安全审计系统
package security
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
// AuditEvent 审计事件
type AuditEvent struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
UserID string `json:"user_id"`
Username string `json:"username"`
Action string `json:"action"`
Resource string `json:"resource"`
Method string `json:"method"`
ClientIP string `json:"client_ip"`
UserAgent string `json:"user_agent"`
Result string `json:"result"`
Error string `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
Metadata map[string]string `json:"metadata"`
}
// AuditLogger 审计日志记录器
type AuditLogger struct {
logFile *os.File
enabled bool
formatter AuditFormatter
}
// AuditFormatter 审计格式化器
type AuditFormatter interface {
Format(event *AuditEvent) ([]byte, error)
}
// JSONFormatter JSON格式化器
type JSONFormatter struct{}
func (f *JSONFormatter) Format(event *AuditEvent) ([]byte, error) {
return json.Marshal(event)
}
// NewAuditLogger 创建审计日志记录器
func NewAuditLogger(logFile string, enabled bool) (*AuditLogger, error) {
var file *os.File
var err error
if enabled {
file, err = os.OpenFile(logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, fmt.Errorf("failed to open audit log file: %w", err)
}
}
return &AuditLogger{
logFile: file,
enabled: enabled,
formatter: &JSONFormatter{},
}, nil
}
// LogEvent 记录审计事件
func (al *AuditLogger) LogEvent(event *AuditEvent) error {
if !al.enabled || al.logFile == nil {
return nil
}
data, err := al.formatter.Format(event)
if err != nil {
return fmt.Errorf("failed to format audit event: %w", err)
}
_, err = al.logFile.Write(append(data, '\n'))
if err != nil {
return fmt.Errorf("failed to write audit event: %w", err)
}
return nil
}
// Close 关闭审计日志记录器
func (al *AuditLogger) Close() error {
if al.logFile != nil {
return al.logFile.Close()
}
return nil
}
// AuditInterceptor 审计拦截器
func AuditInterceptor(logger *AuditLogger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
// 创建审计事件
event := &AuditEvent{
ID: generateEventID(),
Timestamp: start,
Method: info.FullMethod,
Action: extractAction(info.FullMethod),
Resource: extractResource(info.FullMethod),
Metadata: make(map[string]string),
}
// 获取客户端信息
if p, ok := peer.FromContext(ctx); ok {
event.ClientIP = p.Addr.String()
}
// 获取用户信息
if authCtx, err := GetAuthContext(ctx); err == nil {
event.UserID = authCtx.User.ID
event.Username = authCtx.User.Username
}
// 获取元数据
if md, ok := metadata.FromIncomingContext(ctx); ok {
if userAgent := md.Get("user-agent"); len(userAgent) > 0 {
event.UserAgent = userAgent[0]
}
}
// 执行处理器
resp, err := handler(ctx, req)
// 记录结果
event.Duration = time.Since(start)
if err != nil {
event.Result = "error"
event.Error = err.Error()
} else {
event.Result = "success"
}
// 记录审计事件
if logErr := logger.LogEvent(event); logErr != nil {
log.Printf("Failed to log audit event: %v", logErr)
}
return resp, err
}
}
// StreamAuditInterceptor 流审计拦截器
func StreamAuditInterceptor(logger *AuditLogger) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
start := time.Now()
// 创建审计事件
event := &AuditEvent{
ID: generateEventID(),
Timestamp: start,
Method: info.FullMethod,
Action: extractAction(info.FullMethod),
Resource: extractResource(info.FullMethod),
Metadata: make(map[string]string),
}
// 获取客户端信息
if p, ok := peer.FromContext(ss.Context()); ok {
event.ClientIP = p.Addr.String()
}
// 获取用户信息
if authCtx, err := GetAuthContext(ss.Context()); err == nil {
event.UserID = authCtx.User.ID
event.Username = authCtx.User.Username
}
// 执行处理器
err := handler(srv, ss)
// 记录结果
event.Duration = time.Since(start)
if err != nil {
event.Result = "error"
event.Error = err.Error()
} else {
event.Result = "success"
}
// 记录审计事件
if logErr := logger.LogEvent(event); logErr != nil {
log.Printf("Failed to log audit event: %v", logErr)
}
return err
}
}
// 辅助函数
func generateEventID() string {
return fmt.Sprintf("audit_%d", time.Now().UnixNano())
}
func extractAction(method string) string {
parts := strings.Split(method, "/")
if len(parts) >= 3 {
return parts[len(parts)-1]
}
return method
}
func extractResource(method string) string {
parts := strings.Split(method, "/")
if len(parts) >= 2 {
return parts[len(parts)-2]
}
return "unknown"
}
"""
def create_threat_detection(self) -> str:
"""创建威胁检测系统"""
return """
// threat_detection.go - 威胁检测系统
package security
import (
"context"
"fmt"
"log"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
// ThreatType 威胁类型
type ThreatType string
const (
ThreatTypeBruteForce ThreatType = "brute_force"
ThreatTypeRateLimit ThreatType = "rate_limit"
ThreatTypeSuspiciousIP ThreatType = "suspicious_ip"
ThreatTypeAnomalous ThreatType = "anomalous_behavior"
ThreatTypeUnauthorized ThreatType = "unauthorized_access"
)
// ThreatEvent 威胁事件
type ThreatEvent struct {
ID string `json:"id"`
Type ThreatType `json:"type"`
Severity string `json:"severity"`
Timestamp time.Time `json:"timestamp"`
ClientIP string `json:"client_ip"`
UserID string `json:"user_id,omitempty"`
Method string `json:"method"`
Description string `json:"description"`
Metadata map[string]string `json:"metadata"`
Blocked bool `json:"blocked"`
}
// ThreatDetector 威胁检测器
type ThreatDetector struct {
mu sync.RWMutex
failedAttempts map[string]int
requestCounts map[string]int
blockedIPs map[string]time.Time
suspiciousIPs map[string]bool
maxFailedAttempts int
maxRequestsPerMin int
blockDuration time.Duration
enabled bool
alertCallback func(*ThreatEvent)
}
// NewThreatDetector 创建威胁检测器
func NewThreatDetector() *ThreatDetector {
td := &ThreatDetector{
failedAttempts: make(map[string]int),
requestCounts: make(map[string]int),
blockedIPs: make(map[string]time.Time),
suspiciousIPs: make(map[string]bool),
maxFailedAttempts: 5,
maxRequestsPerMin: 100,
blockDuration: 15 * time.Minute,
enabled: true,
}
// 启动清理协程
go td.startCleanupRoutine()
return td
}
// SetAlertCallback 设置告警回调
func (td *ThreatDetector) SetAlertCallback(callback func(*ThreatEvent)) {
td.alertCallback = callback
}
// IsBlocked 检查IP是否被阻止
func (td *ThreatDetector) IsBlocked(clientIP string) bool {
td.mu.RLock()
defer td.mu.RUnlock()
if blockTime, exists := td.blockedIPs[clientIP]; exists {
if time.Since(blockTime) < td.blockDuration {
return true
}
// 清理过期的阻止记录
delete(td.blockedIPs, clientIP)
}
return false
}
// RecordFailedAttempt 记录失败尝试
func (td *ThreatDetector) RecordFailedAttempt(clientIP, userID, method string) {
if !td.enabled {
return
}
td.mu.Lock()
defer td.mu.Unlock()
td.failedAttempts[clientIP]++
if td.failedAttempts[clientIP] >= td.maxFailedAttempts {
// 阻止IP
td.blockedIPs[clientIP] = time.Now()
// 生成威胁事件
event := &ThreatEvent{
ID: generateThreatID(),
Type: ThreatTypeBruteForce,
Severity: "high",
Timestamp: time.Now(),
ClientIP: clientIP,
UserID: userID,
Method: method,
Description: fmt.Sprintf("Brute force attack detected from IP %s", clientIP),
Metadata: map[string]string{
"failed_attempts": fmt.Sprintf("%d", td.failedAttempts[clientIP]),
"block_duration": td.blockDuration.String(),
},
Blocked: true,
}
td.alertThreat(event)
// 重置失败计数
delete(td.failedAttempts, clientIP)
}
}
// RecordRequest 记录请求
func (td *ThreatDetector) RecordRequest(clientIP, method string) {
if !td.enabled {
return
}
td.mu.Lock()
defer td.mu.Unlock()
key := fmt.Sprintf("%s:%s", clientIP, method)
td.requestCounts[key]++
if td.requestCounts[key] > td.maxRequestsPerMin {
// 生成速率限制威胁事件
event := &ThreatEvent{
ID: generateThreatID(),
Type: ThreatTypeRateLimit,
Severity: "medium",
Timestamp: time.Now(),
ClientIP: clientIP,
Method: method,
Description: fmt.Sprintf("Rate limit exceeded for IP %s on method %s", clientIP, method),
Metadata: map[string]string{
"request_count": fmt.Sprintf("%d", td.requestCounts[key]),
"limit": fmt.Sprintf("%d", td.maxRequestsPerMin),
},
Blocked: false,
}
td.alertThreat(event)
}
}
// CheckSuspiciousActivity 检查可疑活动
func (td *ThreatDetector) CheckSuspiciousActivity(clientIP, userID, method string, authSuccess bool) {
if !td.enabled {
return
}
// 检查是否为可疑IP
if td.suspiciousIPs[clientIP] {
event := &ThreatEvent{
ID: generateThreatID(),
Type: ThreatTypeSuspiciousIP,
Severity: "medium",
Timestamp: time.Now(),
ClientIP: clientIP,
UserID: userID,
Method: method,
Description: fmt.Sprintf("Request from suspicious IP %s", clientIP),
Blocked: false,
}
td.alertThreat(event)
}
// 记录认证失败
if !authSuccess {
td.RecordFailedAttempt(clientIP, userID, method)
} else {
// 认证成功,重置失败计数
td.mu.Lock()
delete(td.failedAttempts, clientIP)
td.mu.Unlock()
}
}
// AddSuspiciousIP 添加可疑IP
func (td *ThreatDetector) AddSuspiciousIP(ip string) {
td.mu.Lock()
defer td.mu.Unlock()
td.suspiciousIPs[ip] = true
}
// RemoveSuspiciousIP 移除可疑IP
func (td *ThreatDetector) RemoveSuspiciousIP(ip string) {
td.mu.Lock()
defer td.mu.Unlock()
delete(td.suspiciousIPs, ip)
}
// alertThreat 发送威胁告警
func (td *ThreatDetector) alertThreat(event *ThreatEvent) {
log.Printf("THREAT DETECTED: %s - %s from %s", event.Type, event.Description, event.ClientIP)
if td.alertCallback != nil {
go td.alertCallback(event)
}
}
// startCleanupRoutine 启动清理协程
func (td *ThreatDetector) startCleanupRoutine() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
td.cleanup()
}
}
// cleanup 清理过期数据
func (td *ThreatDetector) cleanup() {
td.mu.Lock()
defer td.mu.Unlock()
// 清理请求计数
td.requestCounts = make(map[string]int)
// 清理过期的阻止记录
for ip, blockTime := range td.blockedIPs {
if time.Since(blockTime) >= td.blockDuration {
delete(td.blockedIPs, ip)
}
}
}
// ThreatDetectionInterceptor 威胁检测拦截器
func ThreatDetectionInterceptor(detector *ThreatDetector) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 获取客户端IP
var clientIP string
if p, ok := peer.FromContext(ctx); ok {
clientIP = p.Addr.String()
}
// 检查是否被阻止
if detector.IsBlocked(clientIP) {
return nil, status.Errorf(codes.PermissionDenied, "IP blocked due to suspicious activity")
}
// 记录请求
detector.RecordRequest(clientIP, info.FullMethod)
// 执行处理器
resp, err := handler(ctx, req)
// 检查认证结果
authSuccess := err == nil || status.Code(err) != codes.Unauthenticated
var userID string
if authCtx, authErr := GetAuthContext(ctx); authErr == nil {
userID = authCtx.User.ID
}
// 检查可疑活动
detector.CheckSuspiciousActivity(clientIP, userID, info.FullMethod, authSuccess)
return resp, err
}
}
func generateThreatID() string {
return fmt.Sprintf("threat_%d", time.Now().UnixNano())
}
"""
# 创建安全监控实例
security_monitor = SecurityMonitor(SecurityConfig())
# 生成审计系统
audit_system = security_monitor.create_audit_system()
print("\n=== 安全审计系统 ===")
print("✓ 审计事件记录")
print("✓ JSON格式化输出")
print("✓ 审计拦截器")
print("✓ 事件追踪")
# 生成威胁检测系统
threat_detection = security_monitor.create_threat_detection()
print("\n=== 威胁检测系统 ===")
print("✓ 暴力破解检测")
print("✓ 速率限制监控")
print("✓ 可疑IP识别")
print("✓ 自动阻止机制")
总结
本章深入探讨了 gRPC 的安全机制与认证授权,主要内容包括:
核心要点
TLS/SSL 安全传输
- 证书生成与管理
- TLS版本控制
- 密码套件配置
- 客户端证书验证
多样化认证机制
- JWT令牌认证
- 基础认证
- API密钥认证
- OAuth2集成
- 双向TLS认证
细粒度授权控制
- 基于角色的访问控制(RBAC)
- 策略引擎
- 权限检查
- 条件评估
安全监控与审计
- 审计事件记录
- 威胁检测
- 异常行为分析
- 自动响应机制
最佳实践
传输安全
- 强制使用TLS 1.2+
- 定期更新证书
- 使用强密码套件
- 启用证书固定
认证安全
- 使用短期令牌
- 实施令牌轮换
- 安全存储密钥
- 多因素认证
授权安全
- 最小权限原则
- 定期权限审查
- 动态权限控制
- 权限继承管理
监控安全
- 全面审计日志
- 实时威胁检测
- 异常告警
- 事件响应
下一步学习
- 学习 gRPC 中间件开发
- 掌握服务网格安全
- 了解零信任架构
- 实践安全合规要求
通过本章学习,你已经掌握了构建安全可靠的 gRPC 应用所需的核心技能,能够实现企业级的安全防护体系。