12.1 概述

Kubernetes作为一个成熟的容器编排平台,提供了丰富的高级特性和强大的扩展机制。本章将深入探讨这些高级功能,包括自定义资源定义(CRD)、Operator模式、准入控制器、调度器扩展、网络插件开发等内容,帮助你更好地理解和扩展Kubernetes的能力。

12.1.1 高级特性概览

  • 自定义资源定义(CRD): 扩展Kubernetes API
  • Operator模式: 自动化应用管理
  • 准入控制器: 请求验证和变更
  • 调度器扩展: 自定义调度逻辑
  • 网络插件: CNI插件开发
  • 存储插件: CSI驱动开发
  • 设备插件: GPU等特殊资源管理

12.1.2 扩展机制架构

# Kubernetes扩展架构图
apiVersion: v1
kind: ConfigMap
metadata:
  name: k8s-extension-architecture
data:
  architecture.md: |
    # Kubernetes扩展架构
    
    ## API扩展层
    ┌─────────────────────────────────────────┐
    │              kubectl/API客户端           │
    └─────────────────┬───────────────────────┘
                      │
    ┌─────────────────▼───────────────────────┐
    │            kube-apiserver               │
    │  ┌─────────────────────────────────────┐ │
    │  │        准入控制器链                  │ │
    │  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐   │ │
    │  │  │ VAC │ │ MAC │ │ ... │ │ WAC │   │ │
    │  │  └─────┘ └─────┘ └─────┘ └─────┘   │ │
    │  └─────────────────────────────────────┘ │
    │  ┌─────────────────────────────────────┐ │
    │  │           API聚合层                 │ │
    │  │  ┌─────────────┐ ┌─────────────┐   │ │
    │  │  │  核心API    │ │  扩展API    │   │ │
    │  │  │             │ │   (CRD)     │   │ │
    │  │  └─────────────┘ └─────────────┘   │ │
    │  └─────────────────────────────────────┘ │
    └─────────────────┬───────────────────────┘
                      │
    ┌─────────────────▼───────────────────────┐
    │              etcd存储                   │
    └─────────────────────────────────────────┘
    
    ## 控制器扩展层
    ┌─────────────────────────────────────────┐
    │           kube-controller-manager       │
    │  ┌─────────────────────────────────────┐ │
    │  │         内置控制器                   │ │
    │  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐   │ │
    │  │  │ RC  │ │ RS  │ │ Dep │ │ ... │   │ │
    │  │  └─────┘ └─────┘ └─────┘ └─────┘   │ │
    │  └─────────────────────────────────────┘ │
    └─────────────────────────────────────────┘
    
    ┌─────────────────────────────────────────┐
    │            自定义控制器                  │
    │  ┌─────────────────────────────────────┐ │
    │  │           Operator                  │ │
    │  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐   │ │
    │  │  │ DB  │ │ MQ  │ │ App │ │ ... │   │ │
    │  │  └─────┘ └─────┘ └─────┘ └─────┘   │ │
    │  └─────────────────────────────────────┘ │
    └─────────────────────────────────────────┘
    
    ## 运行时扩展层
    ┌─────────────────────────────────────────┐
    │              kubelet                    │
    │  ┌─────────────────────────────────────┐ │
    │  │           插件接口                   │ │
    │  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐   │ │
    │  │  │ CNI │ │ CSI │ │ CRI │ │ DPI │   │ │
    │  │  └─────┘ └─────┘ └─────┘ └─────┘   │ │
    │  └─────────────────────────────────────┘ │
    └─────────────────────────────────────────┘
    
    ## 调度扩展层
    ┌─────────────────────────────────────────┐
    │           kube-scheduler                │
    │  ┌─────────────────────────────────────┐ │
    │  │         调度框架                     │ │
    │  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐   │ │
    │  │  │ Pre │ │Filt │ │Score│ │Post │   │ │
    │  │  └─────┘ └─────┘ └─────┘ └─────┘   │ │
    │  └─────────────────────────────────────┘ │
    └─────────────────────────────────────────┘

12.2 自定义资源定义(CRD)

12.2.1 CRD基础概念

自定义资源定义(Custom Resource Definition,CRD)允许用户扩展Kubernetes API,定义自己的资源类型。

# 基础CRD示例
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: webapps.example.com
spec:
  group: example.com
  versions:
  - name: v1
    served: true
    storage: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              replicas:
                type: integer
                minimum: 1
                maximum: 100
              image:
                type: string
              port:
                type: integer
                minimum: 1
                maximum: 65535
              resources:
                type: object
                properties:
                  cpu:
                    type: string
                  memory:
                    type: string
            required:
            - replicas
            - image
            - port
          status:
            type: object
            properties:
              phase:
                type: string
                enum:
                - Pending
                - Running
                - Failed
              readyReplicas:
                type: integer
              conditions:
                type: array
                items:
                  type: object
                  properties:
                    type:
                      type: string
                    status:
                      type: string
                    reason:
                      type: string
                    message:
                      type: string
                    lastTransitionTime:
                      type: string
                      format: date-time
        required:
        - spec
    additionalPrinterColumns:
    - name: Replicas
      type: integer
      description: The number of replicas
      jsonPath: .spec.replicas
    - name: Ready
      type: integer
      description: The number of ready replicas
      jsonPath: .status.readyReplicas
    - name: Phase
      type: string
      description: The phase of the webapp
      jsonPath: .status.phase
    - name: Age
      type: date
      jsonPath: .metadata.creationTimestamp
  scope: Namespaced
  names:
    plural: webapps
    singular: webapp
    kind: WebApp
    shortNames:
    - wa

12.2.2 高级CRD特性

# 高级CRD特性示例
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: databases.db.example.com
spec:
  group: db.example.com
  versions:
  - name: v1
    served: true
    storage: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              type:
                type: string
                enum:
                - mysql
                - postgresql
                - mongodb
              version:
                type: string
                pattern: '^[0-9]+\.[0-9]+\.[0-9]+$'
              storage:
                type: object
                properties:
                  size:
                    type: string
                    pattern: '^[0-9]+[KMGT]i?$'
                  storageClass:
                    type: string
                required:
                - size
              backup:
                type: object
                properties:
                  enabled:
                    type: boolean
                  schedule:
                    type: string
                    pattern: '^(@(annually|yearly|monthly|weekly|daily|hourly|reboot))|(@every (\d+(ns|us|µs|ms|s|m|h))+)|((((\d+,)+\d+|(\d+([/-])\d+)|\d+|\*) ?){5,7})$'
                  retention:
                    type: integer
                    minimum: 1
                    maximum: 365
              monitoring:
                type: object
                properties:
                  enabled:
                    type: boolean
                  metrics:
                    type: array
                    items:
                      type: string
                      enum:
                      - cpu
                      - memory
                      - disk
                      - connections
                      - queries
            required:
            - type
            - version
            - storage
          status:
            type: object
            properties:
              phase:
                type: string
                enum:
                - Pending
                - Creating
                - Running
                - Updating
                - Deleting
                - Failed
              conditions:
                type: array
                items:
                  type: object
                  properties:
                    type:
                      type: string
                    status:
                      type: string
                      enum:
                      - "True"
                      - "False"
                      - "Unknown"
                    reason:
                      type: string
                    message:
                      type: string
                    lastTransitionTime:
                      type: string
                      format: date-time
              endpoints:
                type: object
                properties:
                  primary:
                    type: string
                  readonly:
                    type: array
                    items:
                      type: string
              backupStatus:
                type: object
                properties:
                  lastBackup:
                    type: string
                    format: date-time
                  nextBackup:
                    type: string
                    format: date-time
                  backupCount:
                    type: integer
    # 子资源支持
    subresources:
      status: {}
      scale:
        specReplicasPath: .spec.replicas
        statusReplicasPath: .status.replicas
        labelSelectorPath: .status.selector
    # 转换Webhook
    conversion:
      strategy: Webhook
      webhook:
        clientConfig:
          service:
            namespace: db-system
            name: db-conversion-webhook
            path: /convert
        conversionReviewVersions:
        - v1
        - v1beta1
  - name: v1beta1
    served: true
    storage: false
    deprecated: true
    deprecationWarning: "db.example.com/v1beta1 Database is deprecated; use db.example.com/v1 Database"
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            x-kubernetes-preserve-unknown-fields: true
          status:
            type: object
            x-kubernetes-preserve-unknown-fields: true
  scope: Namespaced
  names:
    plural: databases
    singular: database
    kind: Database
    shortNames:
    - db
    categories:
    - all
    - storage

12.2.3 CRD验证和Webhook

# 验证Webhook配置
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingAdmissionWebhook
metadata:
  name: database-validator
webhooks:
- name: database.validator.example.com
  clientConfig:
    service:
      name: database-webhook
      namespace: db-system
      path: /validate
  rules:
  - operations: ["CREATE", "UPDATE"]
    apiGroups: ["db.example.com"]
    apiVersions: ["v1"]
    resources: ["databases"]
  admissionReviewVersions: ["v1", "v1beta1"]
  sideEffects: None
  failurePolicy: Fail

---
# 变更Webhook配置
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingAdmissionWebhook
metadata:
  name: database-mutator
webhooks:
- name: database.mutator.example.com
  clientConfig:
    service:
      name: database-webhook
      namespace: db-system
      path: /mutate
  rules:
  - operations: ["CREATE", "UPDATE"]
    apiGroups: ["db.example.com"]
    apiVersions: ["v1"]
    resources: ["databases"]
  admissionReviewVersions: ["v1", "v1beta1"]
  sideEffects: None
  failurePolicy: Fail

12.2.4 CRD使用示例

# 使用自定义资源
apiVersion: db.example.com/v1
kind: Database
metadata:
  name: my-postgres
  namespace: production
spec:
  type: postgresql
  version: "13.7.0"
  storage:
    size: 100Gi
    storageClass: fast-ssd
  backup:
    enabled: true
    schedule: "0 2 * * *"  # 每天凌晨2点
    retention: 30
  monitoring:
    enabled: true
    metrics:
    - cpu
    - memory
    - disk
    - connections
    - queries

---
# WebApp自定义资源示例
apiVersion: example.com/v1
kind: WebApp
metadata:
  name: my-webapp
  namespace: default
spec:
  replicas: 3
  image: nginx:1.21
  port: 80
  resources:
    cpu: "500m"
    memory: "512Mi"

12.3 Operator模式

12.3.1 Operator基础概念

Operator是一种扩展Kubernetes功能的方法,它使用自定义资源和控制器来管理应用程序。

// Operator控制器示例
package main

import (
    "context"
    "fmt"
    "time"

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/util/intstr"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    "sigs.k8s.io/controller-runtime/pkg/log"

    webappv1 "example.com/webapp-operator/api/v1"
)

// WebAppReconciler reconciles a WebApp object
type WebAppReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=example.com,resources=webapps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=example.com,resources=webapps/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=example.com,resources=webapps/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete

// Reconcile is part of the main kubernetes reconciliation loop
func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := log.FromContext(ctx)

    // 获取WebApp实例
    var webapp webappv1.WebApp
    if err := r.Get(ctx, req.NamespacedName, &webapp); err != nil {
        if errors.IsNotFound(err) {
            log.Info("WebApp resource not found. Ignoring since object must be deleted")
            return ctrl.Result{}, nil
        }
        log.Error(err, "Failed to get WebApp")
        return ctrl.Result{}, err
    }

    // 检查是否正在删除
    if webapp.ObjectMeta.DeletionTimestamp.IsZero() {
        // 添加finalizer
        if !controllerutil.ContainsFinalizer(&webapp, "webapp.example.com/finalizer") {
            controllerutil.AddFinalizer(&webapp, "webapp.example.com/finalizer")
            return ctrl.Result{}, r.Update(ctx, &webapp)
        }
    } else {
        // 处理删除逻辑
        if controllerutil.ContainsFinalizer(&webapp, "webapp.example.com/finalizer") {
            if err := r.doFinalizerOperationsForWebApp(&webapp); err != nil {
                return ctrl.Result{}, err
            }
            controllerutil.RemoveFinalizer(&webapp, "webapp.example.com/finalizer")
            return ctrl.Result{}, r.Update(ctx, &webapp)
        }
        return ctrl.Result{}, nil
    }

    // 协调Deployment
    if err := r.reconcileDeployment(ctx, &webapp); err != nil {
        log.Error(err, "Failed to reconcile Deployment")
        return ctrl.Result{}, err
    }

    // 协调Service
    if err := r.reconcileService(ctx, &webapp); err != nil {
        log.Error(err, "Failed to reconcile Service")
        return ctrl.Result{}, err
    }

    // 更新状态
    if err := r.updateWebAppStatus(ctx, &webapp); err != nil {
        log.Error(err, "Failed to update WebApp status")
        return ctrl.Result{}, err
    }

    return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}

// reconcileDeployment 协调Deployment资源
func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *webappv1.WebApp) error {
    deployment := &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      webapp.Name,
            Namespace: webapp.Namespace,
        },
    }

    op, err := controllerutil.CreateOrUpdate(ctx, r.Client, deployment, func() error {
        // 设置Deployment规格
        deployment.Spec = appsv1.DeploymentSpec{
            Replicas: &webapp.Spec.Replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: map[string]string{
                    "app": webapp.Name,
                },
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: map[string]string{
                        "app": webapp.Name,
                    },
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  "webapp",
                            Image: webapp.Spec.Image,
                            Ports: []corev1.ContainerPort{
                                {
                                    ContainerPort: webapp.Spec.Port,
                                    Protocol:      corev1.ProtocolTCP,
                                },
                            },
                            Resources: corev1.ResourceRequirements{
                                Requests: corev1.ResourceList{
                                    corev1.ResourceCPU:    resource.MustParse(webapp.Spec.Resources.CPU),
                                    corev1.ResourceMemory: resource.MustParse(webapp.Spec.Resources.Memory),
                                },
                                Limits: corev1.ResourceList{
                                    corev1.ResourceCPU:    resource.MustParse(webapp.Spec.Resources.CPU),
                                    corev1.ResourceMemory: resource.MustParse(webapp.Spec.Resources.Memory),
                                },
                            },
                        },
                    },
                },
            },
        }

        // 设置所有者引用
        return controllerutil.SetControllerReference(webapp, deployment, r.Scheme)
    })

    if err != nil {
        return err
    }

    if op != controllerutil.OperationResultNone {
        log.FromContext(ctx).Info("Deployment reconciled", "operation", op)
    }

    return nil
}

// reconcileService 协调Service资源
func (r *WebAppReconciler) reconcileService(ctx context.Context, webapp *webappv1.WebApp) error {
    service := &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:      webapp.Name,
            Namespace: webapp.Namespace,
        },
    }

    op, err := controllerutil.CreateOrUpdate(ctx, r.Client, service, func() error {
        // 设置Service规格
        service.Spec = corev1.ServiceSpec{
            Selector: map[string]string{
                "app": webapp.Name,
            },
            Ports: []corev1.ServicePort{
                {
                    Port:       80,
                    TargetPort: intstr.FromInt(int(webapp.Spec.Port)),
                    Protocol:   corev1.ProtocolTCP,
                },
            },
            Type: corev1.ServiceTypeClusterIP,
        }

        // 设置所有者引用
        return controllerutil.SetControllerReference(webapp, service, r.Scheme)
    })

    if err != nil {
        return err
    }

    if op != controllerutil.OperationResultNone {
        log.FromContext(ctx).Info("Service reconciled", "operation", op)
    }

    return nil
}

// updateWebAppStatus 更新WebApp状态
func (r *WebAppReconciler) updateWebAppStatus(ctx context.Context, webapp *webappv1.WebApp) error {
    // 获取Deployment状态
    deployment := &appsv1.Deployment{}
    err := r.Get(ctx, client.ObjectKey{
        Namespace: webapp.Namespace,
        Name:      webapp.Name,
    }, deployment)
    if err != nil {
        return err
    }

    // 更新状态
    webapp.Status.ReadyReplicas = deployment.Status.ReadyReplicas
    
    if deployment.Status.ReadyReplicas == webapp.Spec.Replicas {
        webapp.Status.Phase = "Running"
    } else {
        webapp.Status.Phase = "Pending"
    }

    // 更新条件
    now := metav1.Now()
    condition := webappv1.WebAppCondition{
        Type:               "Ready",
        Status:             "True",
        LastTransitionTime: now,
        Reason:             "DeploymentReady",
        Message:            "All replicas are ready",
    }

    if deployment.Status.ReadyReplicas != webapp.Spec.Replicas {
        condition.Status = "False"
        condition.Reason = "DeploymentNotReady"
        condition.Message = fmt.Sprintf("Ready replicas: %d/%d", deployment.Status.ReadyReplicas, webapp.Spec.Replicas)
    }

    // 更新或添加条件
    webapp.Status.Conditions = updateCondition(webapp.Status.Conditions, condition)

    return r.Status().Update(ctx, webapp)
}

// doFinalizerOperationsForWebApp 执行finalizer操作
func (r *WebAppReconciler) doFinalizerOperationsForWebApp(webapp *webappv1.WebApp) error {
    // 在这里执行清理操作
    // 例如:清理外部资源、发送通知等
    log.FromContext(context.Background()).Info("Performing finalizer operations for WebApp", "name", webapp.Name)
    return nil
}

// updateCondition 更新条件
func updateCondition(conditions []webappv1.WebAppCondition, newCondition webappv1.WebAppCondition) []webappv1.WebAppCondition {
    for i, condition := range conditions {
        if condition.Type == newCondition.Type {
            if condition.Status != newCondition.Status {
                conditions[i] = newCondition
            }
            return conditions
        }
    }
    return append(conditions, newCondition)
}

// SetupWithManager sets up the controller with the Manager.
func (r *WebAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&webappv1.WebApp{}).
        Owns(&appsv1.Deployment{}).
        Owns(&corev1.Service{}).
        Complete(r)
}

12.3.2 Operator SDK使用

#!/bin/bash
# Operator SDK项目创建脚本

echo "=== Operator SDK项目创建 ==="

# 1. 初始化项目
echo "1. 初始化Operator项目:"
operator-sdk init --domain=example.com --repo=github.com/example/webapp-operator

# 2. 创建API
echo "2. 创建API:"
operator-sdk create api --group=webapp --version=v1 --kind=WebApp --resource --controller

# 3. 生成CRD
echo "3. 生成CRD:"
make manifests

# 4. 安装CRD到集群
echo "4. 安装CRD:"
make install

# 5. 运行控制器
echo "5. 运行控制器:"
make run

echo "=== Operator项目创建完成 ==="

12.3.3 Operator部署配置

# Operator部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
  name: webapp-operator-controller-manager
  namespace: webapp-operator-system
  labels:
    control-plane: controller-manager
spec:
  selector:
    matchLabels:
      control-plane: controller-manager
  replicas: 1
  template:
    metadata:
      labels:
        control-plane: controller-manager
    spec:
      securityContext:
        runAsNonRoot: true
      containers:
      - command:
        - /manager
        args:
        - --leader-elect
        image: webapp-operator:latest
        name: manager
        securityContext:
          allowPrivilegeEscalation: false
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8081
          initialDelaySeconds: 15
          periodSeconds: 20
        readinessProbe:
          httpGet:
            path: /readyz
            port: 8081
          initialDelaySeconds: 5
          periodSeconds: 10
        resources:
          limits:
            cpu: 500m
            memory: 128Mi
          requests:
            cpu: 10m
            memory: 64Mi
      serviceAccountName: webapp-operator-controller-manager
      terminationGracePeriodSeconds: 10

---
# ServiceAccount
apiVersion: v1
kind: ServiceAccount
metadata:
  name: webapp-operator-controller-manager
  namespace: webapp-operator-system

---
# ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: webapp-operator-manager-role
rules:
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - create
  - delete
  - get
  - list
  - patch
  - update
  - watch
- apiGroups:
  - apps
  resources:
  - deployments
  verbs:
  - create
  - delete
  - get
  - list
  - patch
  - update
  - watch
- apiGroups:
  - webapp.example.com
  resources:
  - webapps
  verbs:
  - create
  - delete
  - get
  - list
  - patch
  - update
  - watch
- apiGroups:
  - webapp.example.com
  resources:
  - webapps/finalizers
  verbs:
  - update
- apiGroups:
  - webapp.example.com
  resources:
  - webapps/status
  verbs:
  - get
  - patch
  - update

---
# ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: webapp-operator-manager-rolebinding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: webapp-operator-manager-role
subjects:
- kind: ServiceAccount
  name: webapp-operator-controller-manager
  namespace: webapp-operator-system

12.4 准入控制器

12.4.1 验证准入控制器

// 验证Webhook服务器
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "strings"

    admissionv1 "k8s.io/api/admission/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/serializer"
    "k8s.io/klog/v2"
)

var (
    scheme = runtime.NewScheme()
    codecs = serializer.NewCodecFactory(scheme)
)

type WebhookServer struct {
    server *http.Server
}

// 验证Pod的Webhook处理器
func (ws *WebhookServer) validatePod(w http.ResponseWriter, r *http.Request) {
    var body []byte
    if r.Body != nil {
        if data, err := io.ReadAll(r.Body); err == nil {
            body = data
        }
    }

    // 验证content-type
    contentType := r.Header.Get("Content-Type")
    if contentType != "application/json" {
        klog.Errorf("contentType=%s, expect application/json", contentType)
        http.Error(w, "invalid Content-Type, want `application/json`", http.StatusUnsupportedMediaType)
        return
    }

    var admissionResponse *admissionv1.AdmissionResponse
    ar := admissionv1.AdmissionReview{}
    if _, _, err := codecs.UniversalDeserializer().Decode(body, nil, &ar); err != nil {
        klog.Errorf("Could not decode body: %v", err)
        admissionResponse = &admissionv1.AdmissionResponse{
            Result: &metav1.Status{
                Message: err.Error(),
            },
        }
    } else {
        admissionResponse = ws.validatePodAdmission(ar.Request)
    }

    admissionReview := admissionv1.AdmissionReview{}
    if admissionResponse != nil {
        admissionReview.Response = admissionResponse
        if ar.Request != nil {
            admissionReview.Response.UID = ar.Request.UID
        }
    }

    respBytes, _ := json.Marshal(admissionReview)
    w.Header().Set("Content-Type", "application/json")
    if _, err := w.Write(respBytes); err != nil {
        klog.Errorf("Could not write response: %v", err)
    }
}

// 验证Pod准入逻辑
func (ws *WebhookServer) validatePodAdmission(req *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse {
    var pod corev1.Pod
    if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
        klog.Errorf("Could not unmarshal raw object: %v", err)
        return &admissionv1.AdmissionResponse{
            Result: &metav1.Status{
                Message: err.Error(),
            },
        }
    }

    allowed := true
    var result *metav1.Status
    message := ""

    // 验证规则1: 检查镜像来源
    for _, container := range pod.Spec.Containers {
        if !ws.isAllowedRegistry(container.Image) {
            allowed = false
            message = fmt.Sprintf("Image %s is not from allowed registry", container.Image)
            break
        }
    }

    // 验证规则2: 检查安全上下文
    if allowed {
        if pod.Spec.SecurityContext != nil {
            if pod.Spec.SecurityContext.RunAsRoot != nil && *pod.Spec.SecurityContext.RunAsRoot {
                allowed = false
                message = "Running as root is not allowed"
            }
        }

        // 检查容器安全上下文
        for _, container := range pod.Spec.Containers {
            if container.SecurityContext != nil {
                if container.SecurityContext.Privileged != nil && *container.SecurityContext.Privileged {
                    allowed = false
                    message = "Privileged containers are not allowed"
                    break
                }
                if container.SecurityContext.RunAsRoot != nil && *container.SecurityContext.RunAsRoot {
                    allowed = false
                    message = "Running as root is not allowed"
                    break
                }
            }
        }
    }

    // 验证规则3: 检查资源限制
    if allowed {
        for _, container := range pod.Spec.Containers {
            if container.Resources.Limits == nil {
                allowed = false
                message = "Resource limits must be specified"
                break
            }
            if container.Resources.Requests == nil {
                allowed = false
                message = "Resource requests must be specified"
                break
            }
        }
    }

    // 验证规则4: 检查标签
    if allowed {
        if pod.Labels == nil {
            allowed = false
            message = "Pod must have labels"
        } else {
            requiredLabels := []string{"app", "version", "environment"}
            for _, label := range requiredLabels {
                if _, exists := pod.Labels[label]; !exists {
                    allowed = false
                    message = fmt.Sprintf("Required label '%s' is missing", label)
                    break
                }
            }
        }
    }

    if !allowed {
        result = &metav1.Status{
            Message: message,
        }
    }

    return &admissionv1.AdmissionResponse{
        Allowed: allowed,
        Result:  result,
    }
}

// 检查镜像注册表是否被允许
func (ws *WebhookServer) isAllowedRegistry(image string) bool {
    allowedRegistries := []string{
        "docker.io",
        "gcr.io",
        "quay.io",
        "registry.k8s.io",
        "your-private-registry.com",
    }

    for _, registry := range allowedRegistries {
        if strings.HasPrefix(image, registry) {
            return true
        }
    }

    // 如果没有指定注册表,默认为docker.io
    if !strings.Contains(image, "/") || !strings.Contains(strings.Split(image, "/")[0], ".") {
        return true
    }

    return false
}

func main() {
    certPath := "/etc/certs/tls.crt"
    keyPath := "/etc/certs/tls.key"

    mux := http.NewServeMux()
    webhookServer := &WebhookServer{
        server: &http.Server{
            Addr:    ":8443",
            Handler: mux,
        },
    }

    mux.HandleFunc("/validate", webhookServer.validatePod)
    mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    })

    klog.Info("Starting webhook server...")
    if err := webhookServer.server.ListenAndServeTLS(certPath, keyPath); err != nil {
        klog.Fatalf("Failed to start webhook server: %v", err)
    }
}

12.4.2 变更准入控制器

// 变更Webhook处理器
func (ws *WebhookServer) mutatePod(w http.ResponseWriter, r *http.Request) {
    var body []byte
    if r.Body != nil {
        if data, err := io.ReadAll(r.Body); err == nil {
            body = data
        }
    }

    var admissionResponse *admissionv1.AdmissionResponse
    ar := admissionv1.AdmissionReview{}
    if _, _, err := codecs.UniversalDeserializer().Decode(body, nil, &ar); err != nil {
        klog.Errorf("Could not decode body: %v", err)
        admissionResponse = &admissionv1.AdmissionResponse{
            Result: &metav1.Status{
                Message: err.Error(),
            },
        }
    } else {
        admissionResponse = ws.mutatePodAdmission(ar.Request)
    }

    admissionReview := admissionv1.AdmissionReview{}
    if admissionResponse != nil {
        admissionReview.Response = admissionResponse
        if ar.Request != nil {
            admissionReview.Response.UID = ar.Request.UID
        }
    }

    respBytes, _ := json.Marshal(admissionReview)
    w.Header().Set("Content-Type", "application/json")
    if _, err := w.Write(respBytes); err != nil {
        klog.Errorf("Could not write response: %v", err)
    }
}

// 变更Pod准入逻辑
func (ws *WebhookServer) mutatePodAdmission(req *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse {
    var pod corev1.Pod
    if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
        klog.Errorf("Could not unmarshal raw object: %v", err)
        return &admissionv1.AdmissionResponse{
            Result: &metav1.Status{
                Message: err.Error(),
            },
        }
    }

    var patches []map[string]interface{}

    // 变更1: 添加默认标签
    if pod.Labels == nil {
        patches = append(patches, map[string]interface{}{
            "op":    "add",
            "path":  "/metadata/labels",
            "value": map[string]string{},
        })
    }

    // 添加默认标签
    defaultLabels := map[string]string{
        "managed-by": "admission-webhook",
        "version":    "v1.0.0",
    }

    for key, value := range defaultLabels {
        if pod.Labels == nil || pod.Labels[key] == "" {
            patches = append(patches, map[string]interface{}{
                "op":    "add",
                "path":  fmt.Sprintf("/metadata/labels/%s", strings.ReplaceAll(key, "/", "~1")),
                "value": value,
            })
        }
    }

    // 变更2: 添加默认注解
    if pod.Annotations == nil {
        patches = append(patches, map[string]interface{}{
            "op":    "add",
            "path":  "/metadata/annotations",
            "value": map[string]string{},
        })
    }

    patches = append(patches, map[string]interface{}{
        "op":    "add",
        "path":  "/metadata/annotations/mutated-by",
        "value": "admission-webhook",
    })

    patches = append(patches, map[string]interface{}{
        "op":    "add",
        "path":  "/metadata/annotations/mutation-timestamp",
        "value": time.Now().Format(time.RFC3339),
    })

    // 变更3: 添加sidecar容器
    if ws.shouldInjectSidecar(&pod) {
        sidecarContainer := corev1.Container{
            Name:  "logging-sidecar",
            Image: "fluent/fluent-bit:1.9",
            VolumeMounts: []corev1.VolumeMount{
                {
                    Name:      "varlog",
                    MountPath: "/var/log",
                },
            },
            Resources: corev1.ResourceRequirements{
                Requests: corev1.ResourceList{
                    corev1.ResourceCPU:    resource.MustParse("100m"),
                    corev1.ResourceMemory: resource.MustParse("128Mi"),
                },
                Limits: corev1.ResourceList{
                    corev1.ResourceCPU:    resource.MustParse("200m"),
                    corev1.ResourceMemory: resource.MustParse("256Mi"),
                },
            },
        }

        patches = append(patches, map[string]interface{}{
            "op":    "add",
            "path":  "/spec/containers/-",
            "value": sidecarContainer,
        })

        // 添加共享卷
        if len(pod.Spec.Volumes) == 0 {
            patches = append(patches, map[string]interface{}{
                "op":    "add",
                "path":  "/spec/volumes",
                "value": []corev1.Volume{},
            })
        }

        logVolume := corev1.Volume{
            Name: "varlog",
            VolumeSource: corev1.VolumeSource{
                EmptyDir: &corev1.EmptyDirVolumeSource{},
            },
        }

        patches = append(patches, map[string]interface{}{
            "op":    "add",
            "path":  "/spec/volumes/-",
            "value": logVolume,
        })
    }

    // 变更4: 设置默认安全上下文
    for i, container := range pod.Spec.Containers {
        if container.SecurityContext == nil {
            patches = append(patches, map[string]interface{}{
                "op":   "add",
                "path": fmt.Sprintf("/spec/containers/%d/securityContext", i),
                "value": &corev1.SecurityContext{
                    RunAsNonRoot:             &[]bool{true}[0],
                    RunAsUser:                &[]int64{1000}[0],
                    AllowPrivilegeEscalation: &[]bool{false}[0],
                    ReadOnlyRootFilesystem:   &[]bool{true}[0],
                    Capabilities: &corev1.Capabilities{
                        Drop: []corev1.Capability{"ALL"},
                    },
                },
            })
        }
    }

    patchBytes, _ := json.Marshal(patches)
    
    return &admissionv1.AdmissionResponse{
        Allowed: true,
        Patch:   patchBytes,
        PatchType: func() *admissionv1.PatchType {
            pt := admissionv1.PatchTypeJSONPatch
            return &pt
        }(),
    }
}

// 判断是否应该注入sidecar
func (ws *WebhookServer) shouldInjectSidecar(pod *corev1.Pod) bool {
    // 检查注解
    if pod.Annotations != nil {
        if inject, exists := pod.Annotations["sidecar.example.com/inject"]; exists {
            return inject == "true"
        }
    }

    // 检查命名空间标签
    // 这里简化处理,实际应该查询命名空间
    if strings.HasPrefix(pod.Namespace, "prod-") {
        return true
    }

    return false
}

12.4.3 Webhook部署配置

# Webhook部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
  name: admission-webhook
  namespace: webhook-system
spec:
  replicas: 2
  selector:
    matchLabels:
      app: admission-webhook
  template:
    metadata:
      labels:
        app: admission-webhook
    spec:
      serviceAccountName: admission-webhook
      containers:
      - name: webhook
        image: admission-webhook:latest
        ports:
        - containerPort: 8443
          name: webhook-api
        volumeMounts:
        - name: webhook-tls-certs
          mountPath: /etc/certs
          readOnly: true
        env:
        - name: TLS_CERT_FILE
          value: /etc/certs/tls.crt
        - name: TLS_PRIVATE_KEY_FILE
          value: /etc/certs/tls.key
        livenessProbe:
          httpGet:
            path: /health
            port: 8443
            scheme: HTTPS
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8443
            scheme: HTTPS
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          limits:
            cpu: 500m
            memory: 128Mi
          requests:
            cpu: 250m
            memory: 64Mi
      volumes:
      - name: webhook-tls-certs
        secret:
          secretName: webhook-tls-certs

---
apiVersion: v1
kind: Service
metadata:
  name: admission-webhook
  namespace: webhook-system
spec:
  selector:
    app: admission-webhook
  ports:
  - name: webhook-api
    port: 443
    targetPort: 8443
    protocol: TCP

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: admission-webhook
  namespace: webhook-system

12.5 调度器扩展

12.5.1 调度框架插件

// 自定义调度器插件
package main

import (
    "context"
    "fmt"
    "math"

    v1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/klog/v2"
    "k8s.io/kubernetes/pkg/scheduler/framework"
)

// NodeResourceFit 插件名称
const NodeResourceFit = "NodeResourceFit"

// NodeResourceFitArgs 插件参数
type NodeResourceFitArgs struct {
    // CPU权重
    CPUWeight int64 `json:"cpuWeight,omitempty"`
    // 内存权重
    MemoryWeight int64 `json:"memoryWeight,omitempty"`
    // 存储权重
    StorageWeight int64 `json:"storageWeight,omitempty"`
}

// NodeResourceFitPlugin 自定义调度插件
type NodeResourceFitPlugin struct {
    args   *NodeResourceFitArgs
    handle framework.Handle
}

// Name 返回插件名称
func (pl *NodeResourceFitPlugin) Name() string {
    return NodeResourceFit
}

// Filter 过滤阶段:检查节点是否有足够资源
func (pl *NodeResourceFitPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    node := nodeInfo.Node()
    if node == nil {
        return framework.NewStatus(framework.Error, "node not found")
    }

    // 计算Pod资源需求
    podRequest := pl.calculatePodResourceRequest(pod)
    
    // 计算节点可用资源
    nodeAllocatable := pl.calculateNodeAllocatable(nodeInfo)
    
    // 检查CPU
    if podRequest.MilliCPU > nodeAllocatable.MilliCPU {
        return framework.NewStatus(framework.Unschedulable, 
            fmt.Sprintf("Insufficient CPU: requested %dm, available %dm", 
                podRequest.MilliCPU, nodeAllocatable.MilliCPU))
    }
    
    // 检查内存
    if podRequest.Memory > nodeAllocatable.Memory {
        return framework.NewStatus(framework.Unschedulable, 
            fmt.Sprintf("Insufficient memory: requested %d, available %d", 
                podRequest.Memory, nodeAllocatable.Memory))
    }
    
    // 检查存储
    if podRequest.EphemeralStorage > nodeAllocatable.EphemeralStorage {
        return framework.NewStatus(framework.Unschedulable, 
            fmt.Sprintf("Insufficient storage: requested %d, available %d", 
                podRequest.EphemeralStorage, nodeAllocatable.EphemeralStorage))
    }

    return nil
}

// Score 评分阶段:根据资源使用率给节点评分
func (pl *NodeResourceFitPlugin) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
    nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
    if err != nil {
        return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
    }

    // 计算Pod资源需求
    podRequest := pl.calculatePodResourceRequest(pod)
    
    // 计算节点可用资源
    nodeAllocatable := pl.calculateNodeAllocatable(nodeInfo)
    
    // 计算资源利用率评分
    cpuScore := pl.calculateResourceScore(podRequest.MilliCPU, nodeAllocatable.MilliCPU)
    memoryScore := pl.calculateResourceScore(podRequest.Memory, nodeAllocatable.Memory)
    storageScore := pl.calculateResourceScore(podRequest.EphemeralStorage, nodeAllocatable.EphemeralStorage)
    
    // 加权平均
    totalWeight := pl.args.CPUWeight + pl.args.MemoryWeight + pl.args.StorageWeight
    if totalWeight == 0 {
        totalWeight = 3 // 默认权重
        pl.args.CPUWeight = 1
        pl.args.MemoryWeight = 1
        pl.args.StorageWeight = 1
    }
    
    score := (cpuScore*pl.args.CPUWeight + memoryScore*pl.args.MemoryWeight + storageScore*pl.args.StorageWeight) / totalWeight
    
    klog.V(4).Infof("Node %s score: CPU=%d, Memory=%d, Storage=%d, Final=%d", 
        nodeName, cpuScore, memoryScore, storageScore, score)
    
    return score, nil
}

// PreFilter 预过滤阶段:预处理Pod信息
func (pl *NodeResourceFitPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
    // 在状态中存储Pod资源需求,避免重复计算
    podRequest := pl.calculatePodResourceRequest(pod)
    state.Write("podResourceRequest", podRequest)
    return nil
}

// PreFilterExtensions 返回预过滤扩展
func (pl *NodeResourceFitPlugin) PreFilterExtensions() framework.PreFilterExtensions {
    return nil
}

// 资源需求结构
type ResourceRequest struct {
    MilliCPU         int64
    Memory           int64
    EphemeralStorage int64
}

// 计算Pod资源需求
func (pl *NodeResourceFitPlugin) calculatePodResourceRequest(pod *v1.Pod) *ResourceRequest {
    request := &ResourceRequest{}
    
    for _, container := range pod.Spec.Containers {
        if container.Resources.Requests != nil {
            if cpu := container.Resources.Requests[v1.ResourceCPU]; !cpu.IsZero() {
                request.MilliCPU += cpu.MilliValue()
            }
            if memory := container.Resources.Requests[v1.ResourceMemory]; !memory.IsZero() {
                request.Memory += memory.Value()
            }
            if storage := container.Resources.Requests[v1.ResourceEphemeralStorage]; !storage.IsZero() {
                request.EphemeralStorage += storage.Value()
            }
        }
    }
    
    return request
}

// 计算节点可用资源
func (pl *NodeResourceFitPlugin) calculateNodeAllocatable(nodeInfo *framework.NodeInfo) *ResourceRequest {
    allocatable := &ResourceRequest{}
    
    node := nodeInfo.Node()
    if node.Status.Allocatable != nil {
        if cpu := node.Status.Allocatable[v1.ResourceCPU]; !cpu.IsZero() {
            allocatable.MilliCPU = cpu.MilliValue()
        }
        if memory := node.Status.Allocatable[v1.ResourceMemory]; !memory.IsZero() {
            allocatable.Memory = memory.Value()
        }
        if storage := node.Status.Allocatable[v1.ResourceEphemeralStorage]; !storage.IsZero() {
            allocatable.EphemeralStorage = storage.Value()
        }
    }
    
    // 减去已分配的资源
    for _, podInfo := range nodeInfo.Pods {
        pod := podInfo.Pod
        for _, container := range pod.Spec.Containers {
            if container.Resources.Requests != nil {
                if cpu := container.Resources.Requests[v1.ResourceCPU]; !cpu.IsZero() {
                    allocatable.MilliCPU -= cpu.MilliValue()
                }
                if memory := container.Resources.Requests[v1.ResourceMemory]; !memory.IsZero() {
                    allocatable.Memory -= memory.Value()
                }
                if storage := container.Resources.Requests[v1.ResourceEphemeralStorage]; !storage.IsZero() {
                    allocatable.EphemeralStorage -= storage.Value()
                }
            }
        }
    }
    
    return allocatable
}

// 计算资源评分(0-100)
func (pl *NodeResourceFitPlugin) calculateResourceScore(requested, available int64) int64 {
    if available <= 0 {
        return 0
    }
    
    if requested <= 0 {
        return 100
    }
    
    // 计算剩余资源比例
    remaining := available - requested
    if remaining < 0 {
        return 0
    }
    
    // 评分:剩余资源越多,评分越高
    score := int64(math.Round(float64(remaining) / float64(available) * 100))
    if score > 100 {
        score = 100
    }
    
    return score
}

// New 创建插件实例
func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) {
    nodeResourceFitArgs := &NodeResourceFitArgs{
        CPUWeight:     1,
        MemoryWeight:  1,
        StorageWeight: 1,
    }
    
    if args != nil {
        if err := runtime.DefaultUnstructuredConverter.FromUnstructured(
            args.(*runtime.Unknown).Raw, nodeResourceFitArgs); err != nil {
            return nil, fmt.Errorf("want args to be of type NodeResourceFitArgs, got %T", args)
        }
    }
    
    return &NodeResourceFitPlugin{
        args:   nodeResourceFitArgs,
        handle: handle,
    }, nil
}

// 确保插件实现了所需接口
var _ framework.FilterPlugin = &NodeResourceFitPlugin{}
var _ framework.ScorePlugin = &NodeResourceFitPlugin{}
var _ framework.PreFilterPlugin = &NodeResourceFitPlugin{}

12.5.2 调度器配置

# 自定义调度器配置
apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: custom-scheduler
  plugins:
    # 预过滤阶段
    preFilter:
      enabled:
      - name: NodeResourceFit
      - name: NodeAffinity
      - name: PodTopologySpread
      disabled:
      - name: NodeResourcesFit  # 禁用默认插件
    
    # 过滤阶段
    filter:
      enabled:
      - name: NodeResourceFit
      - name: NodeAffinity
      - name: PodTopologySpread
      - name: TaintToleration
      disabled:
      - name: NodeResourcesFit
    
    # 评分阶段
    score:
      enabled:
      - name: NodeResourceFit
        weight: 50
      - name: NodeAffinity
        weight: 30
      - name: PodTopologySpread
        weight: 20
      disabled:
      - name: NodeResourcesFit
  
  # 插件配置
  pluginConfig:
  - name: NodeResourceFit
    args:
      cpuWeight: 3
      memoryWeight: 2
      storageWeight: 1
  
  - name: PodTopologySpread
    args:
      defaultConstraints:
      - maxSkew: 1
        topologyKey: kubernetes.io/hostname
        whenUnsatisfiable: DoNotSchedule
      - maxSkew: 1
        topologyKey: topology.kubernetes.io/zone
        whenUnsatisfiable: ScheduleAnyway

---
# 调度器部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: custom-scheduler
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: custom-scheduler
  template:
    metadata:
      labels:
        app: custom-scheduler
    spec:
      serviceAccountName: custom-scheduler
      containers:
      - name: kube-scheduler
        image: k8s.gcr.io/kube-scheduler:v1.28.0
        command:
        - kube-scheduler
        - --config=/etc/kubernetes/scheduler-config.yaml
        - --v=2
        volumeMounts:
        - name: config
          mountPath: /etc/kubernetes
        - name: plugin
          mountPath: /opt/plugins
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
      volumes:
      - name: config
        configMap:
          name: scheduler-config
      - name: plugin
        hostPath:
          path: /opt/scheduler-plugins

---
# 调度器ServiceAccount
apiVersion: v1
kind: ServiceAccount
metadata:
  name: custom-scheduler
  namespace: kube-system

---
# 调度器ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: custom-scheduler
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: system:kube-scheduler
subjects:
- kind: ServiceAccount
  name: custom-scheduler
  namespace: kube-system

12.5.3 使用自定义调度器

# 使用自定义调度器的Pod
apiVersion: v1
kind: Pod
metadata:
  name: test-custom-scheduler
spec:
  schedulerName: custom-scheduler  # 指定自定义调度器
  containers:
  - name: app
    image: nginx:1.21
    resources:
      requests:
        cpu: 100m
        memory: 128Mi
      limits:
        cpu: 200m
        memory: 256Mi

---
# 使用自定义调度器的Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: app-with-custom-scheduler
spec:
  replicas: 3
  selector:
    matchLabels:
      app: custom-scheduled-app
  template:
    metadata:
      labels:
        app: custom-scheduled-app
    spec:
      schedulerName: custom-scheduler
      containers:
      - name: app
        image: nginx:1.21
        resources:
          requests:
            cpu: 200m
            memory: 256Mi
          limits:
            cpu: 500m
            memory: 512Mi

12.6 网络插件开发

12.6.1 CNI插件基础

// 简单的CNI插件示例
package main

import (
    "encoding/json"
    "fmt"
    "net"
    "os"
    "runtime"

    "github.com/containernetworking/cni/pkg/skel"
    "github.com/containernetworking/cni/pkg/types"
    "github.com/containernetworking/cni/pkg/version"
    "github.com/containernetworking/plugins/pkg/ip"
    "github.com/containernetworking/plugins/pkg/ipam"
    "github.com/containernetworking/plugins/pkg/ns"
    "github.com/vishvananda/netlink"
)

// NetConf CNI网络配置
type NetConf struct {
    types.NetConf
    Bridge   string `json:"bridge"`
    IsGW     bool   `json:"isGateway"`
    IsDefaultGW bool `json:"isDefaultGateway"`
    ForceAddress bool `json:"forceAddress"`
    IPMasq   bool   `json:"ipMasq"`
    MTU      int    `json:"mtu"`
    HairpinMode bool `json:"hairpinMode"`
}

// cmdAdd 添加网络接口
func cmdAdd(args *skel.CmdArgs) error {
    n, cniVersion, err := loadNetConf(args.StdinData)
    if err != nil {
        return err
    }

    // 创建或获取网桥
    br, err := setupBridge(n)
    if err != nil {
        return err
    }

    netns, err := ns.GetNS(args.Netns)
    if err != nil {
        return fmt.Errorf("failed to open netns %q: %v", args.Netns, err)
    }
    defer netns.Close()

    // 分配IP地址
    r, err := ipam.ExecAdd(n.IPAM.Type, args.StdinData)
    if err != nil {
        return err
    }

    // 创建veth pair
    hostInterface, containerInterface, err := setupVeth(netns, br, args.IfName, n.MTU, n.HairpinMode)
    if err != nil {
        return err
    }

    // 在容器命名空间中配置接口
    err = netns.Do(func(hostNS ns.NetNS) error {
        return configureInterface(args.IfName, r.IPs, r.Routes)
    })
    if err != nil {
        return err
    }

    // 配置网桥
    if n.IsGW {
        err = ensureBridgeAddr(br, r.IPs, n.ForceAddress)
        if err != nil {
            return err
        }
    }

    // 配置IP转发和masquerade
    if n.IPMasq {
        err = setupIPMasq(r.IPs)
        if err != nil {
            return err
        }
    }

    result := &types.Result{
        CNIVersion: cniVersion,
        IPs:        r.IPs,
        Routes:     r.Routes,
        DNS:        r.DNS,
        Interfaces: []*types.Interface{
            hostInterface,
            containerInterface,
        },
    }

    return types.PrintResult(result, cniVersion)
}

// cmdDel 删除网络接口
func cmdDel(args *skel.CmdArgs) error {
    n, _, err := loadNetConf(args.StdinData)
    if err != nil {
        return err
    }

    // 释放IP地址
    err = ipam.ExecDel(n.IPAM.Type, args.StdinData)
    if err != nil {
        return err
    }

    // 删除veth接口
    if args.Netns == "" {
        return nil
    }

    err = ns.WithNetNSPath(args.Netns, func(hostNS ns.NetNS) error {
        return ip.DelLinkByName(args.IfName)
    })
    if err != nil && err != ip.ErrLinkNotFound {
        return err
    }

    return nil
}

// cmdCheck 检查网络接口
func cmdCheck(args *skel.CmdArgs) error {
    // 实现网络接口检查逻辑
    return nil
}

// loadNetConf 加载网络配置
func loadNetConf(bytes []byte) (*NetConf, string, error) {
    n := &NetConf{}
    if err := json.Unmarshal(bytes, n); err != nil {
        return nil, "", fmt.Errorf("failed to load netconf: %v", err)
    }

    return n, n.CNIVersion, nil
}

// setupBridge 设置网桥
func setupBridge(n *NetConf) (*netlink.Bridge, error) {
    // 检查网桥是否存在
    l, err := netlink.LinkByName(n.Bridge)
    if err != nil {
        // 创建网桥
        br := &netlink.Bridge{
            LinkAttrs: netlink.LinkAttrs{
                Name: n.Bridge,
                MTU:  n.MTU,
            },
        }
        
        if err := netlink.LinkAdd(br); err != nil {
            return nil, fmt.Errorf("could not add %q: %v", n.Bridge, err)
        }
        
        l = br
    }

    br, ok := l.(*netlink.Bridge)
    if !ok {
        return nil, fmt.Errorf("%q already exists but is not a bridge", n.Bridge)
    }

    // 启动网桥
    if err := netlink.LinkSetUp(br); err != nil {
        return nil, err
    }

    return br, nil
}

// setupVeth 设置veth pair
func setupVeth(netns ns.NetNS, br *netlink.Bridge, ifName string, mtu int, hairpinMode bool) (*types.Interface, *types.Interface, error) {
    hostIface := &types.Interface{}
    containerIface := &types.Interface{}

    err := netns.Do(func(hostNS ns.NetNS) error {
        // 创建veth pair
        hostVeth, containerVeth, err := ip.SetupVeth(ifName, mtu, hostNS)
        if err != nil {
            return err
        }

        hostIface.Name = hostVeth.Name
        hostIface.Mac = hostVeth.HardwareAddr.String()
        containerIface.Name = containerVeth.Name
        containerIface.Mac = containerVeth.HardwareAddr.String()
        containerIface.Sandbox = netns.Path()

        // 将host端veth添加到网桥
        if err := netlink.LinkSetMaster(hostVeth, br); err != nil {
            return fmt.Errorf("failed to connect %q to bridge %v: %v", hostVeth.Name, br.Attrs().Name, err)
        }

        // 设置hairpin模式
        if hairpinMode {
            if err = netlink.LinkSetHairpin(hostVeth, true); err != nil {
                return fmt.Errorf("failed to setup hairpin mode for %v: %v", hostVeth.Name, err)
            }
        }

        return nil
    })

    return hostIface, containerIface, err
}

// configureInterface 配置容器接口
func configureInterface(ifName string, ips []*types.IPConfig, routes []*types.Route) error {
    link, err := netlink.LinkByName(ifName)
    if err != nil {
        return fmt.Errorf("failed to lookup %q: %v", ifName, err)
    }

    // 配置IP地址
    for _, ipc := range ips {
        addr := &netlink.Addr{IPNet: &ipc.Address, Label: ""}
        if err = netlink.AddrAdd(link, addr); err != nil {
            return fmt.Errorf("failed to add IP addr %v to %q: %v", ipc.Address, ifName, err)
        }
    }

    // 启动接口
    if err = netlink.LinkSetUp(link); err != nil {
        return fmt.Errorf("failed to set %q UP: %v", ifName, err)
    }

    // 配置路由
    for _, r := range routes {
        route := &netlink.Route{
            LinkIndex: link.Attrs().Index,
            Scope:     netlink.SCOPE_UNIVERSE,
            Dst:       &r.Dst,
            Gw:        r.GW,
        }
        if err = netlink.RouteAdd(route); err != nil {
            return fmt.Errorf("failed to add route %v: %v", r, err)
        }
    }

    return nil
}

// ensureBridgeAddr 确保网桥有IP地址
func ensureBridgeAddr(br *netlink.Bridge, ips []*types.IPConfig, forceAddress bool) error {
    addrs, err := netlink.AddrList(br, netlink.FAMILY_V4)
    if err != nil && err != netlink.ErrNotImplemented {
        return fmt.Errorf("could not get list of IP addresses: %v", err)
    }

    // 如果网桥已有地址且不强制设置,则跳过
    if len(addrs) > 0 && !forceAddress {
        return nil
    }

    // 为网桥设置网关地址
    for _, ipc := range ips {
        gatewayAddr := &net.IPNet{
            IP:   ipc.Gateway,
            Mask: ipc.Address.Mask,
        }
        addr := &netlink.Addr{IPNet: gatewayAddr, Label: ""}
        if err = netlink.AddrAdd(br, addr); err != nil {
            return fmt.Errorf("failed to add IP addr %v to %q: %v", gatewayAddr, br.Attrs().Name, err)
        }
    }

    return nil
}

// setupIPMasq 设置IP masquerade
func setupIPMasq(ips []*types.IPConfig) error {
    // 启用IP转发
    if err := ip.EnableIP4Forward(); err != nil {
        return fmt.Errorf("failed to enable forwarding: %v", err)
    }

    // 这里应该设置iptables规则,简化处理
    // 实际实现需要使用iptables库
    return nil
}

func main() {
    skel.PluginMain(cmdAdd, cmdCheck, cmdDel, version.All, "Simple CNI plugin v0.1.0")
}

func init() {
    // 锁定OS线程
    runtime.LockOSThread()
}

12.6.2 CNI插件配置

{
  "cniVersion": "0.4.0",
  "name": "simple-bridge",
  "type": "simple-bridge",
  "bridge": "cni0",
  "isGateway": true,
  "isDefaultGateway": true,
  "forceAddress": false,
  "ipMasq": true,
  "mtu": 1500,
  "hairpinMode": true,
  "ipam": {
    "type": "host-local",
    "subnet": "10.244.0.0/16",
    "routes": [
      {
        "dst": "0.0.0.0/0"
      }
    ]
  }
}
# CNI插件DaemonSet部署
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: simple-cni
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: simple-cni
  template:
    metadata:
      labels:
        app: simple-cni
    spec:
      hostNetwork: true
      tolerations:
      - operator: Exists
        effect: NoSchedule
      serviceAccountName: simple-cni
      containers:
      - name: install-cni
        image: simple-cni:latest
        command: ["/install-cni.sh"]
        env:
        - name: CNI_NETWORK_CONFIG
          valueFrom:
            configMapKeyRef:
              name: simple-cni-config
              key: cni_network_config
        volumeMounts:
        - name: cni-bin-dir
          mountPath: /host/opt/cni/bin
        - name: cni-net-dir
          mountPath: /host/etc/cni/net.d
        securityContext:
          privileged: true
      volumes:
      - name: cni-bin-dir
        hostPath:
          path: /opt/cni/bin
      - name: cni-net-dir
        hostPath:
          path: /etc/cni/net.d

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: simple-cni-config
  namespace: kube-system
data:
  cni_network_config: |
    {
      "cniVersion": "0.4.0",
      "name": "simple-bridge",
      "type": "simple-bridge",
      "bridge": "cni0",
      "isGateway": true,
      "isDefaultGateway": true,
      "forceAddress": false,
      "ipMasq": true,
      "mtu": 1500,
      "hairpinMode": true,
      "ipam": {
        "type": "host-local",
        "subnet": "10.244.0.0/16",
        "routes": [
          {
            "dst": "0.0.0.0/0"
          }
        ]
      }
    }

12.7 存储插件开发

12.7.1 CSI驱动开发

// CSI驱动示例
package main

import (
    "context"
    "fmt"
    "os"
    "path/filepath"
    "strings"

    "github.com/container-storage-interface/spec/lib/go/csi"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "k8s.io/klog/v2"
)

// Driver CSI驱动结构
type Driver struct {
    name    string
    version string
    nodeID  string
    
    // 服务能力
    vcap  map[csi.VolumeCapability_AccessMode_Mode]bool
    cscap []*csi.ControllerServiceCapability
    nscap []*csi.NodeServiceCapability
}

// NewDriver 创建新的CSI驱动
func NewDriver(name, version, nodeID string) *Driver {
    d := &Driver{
        name:    name,
        version: version,
        nodeID:  nodeID,
        vcap:    map[csi.VolumeCapability_AccessMode_Mode]bool{},
    }

    // 设置支持的访问模式
    d.vcap[csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER] = true
    d.vcap[csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY] = true
    d.vcap[csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY] = true

    // 设置控制器服务能力
    d.cscap = []*csi.ControllerServiceCapability{
        {
            Type: &csi.ControllerServiceCapability_Rpc{
                Rpc: &csi.ControllerServiceCapability_RPC{
                    Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
                },
            },
        },
        {
            Type: &csi.ControllerServiceCapability_Rpc{
                Rpc: &csi.ControllerServiceCapability_RPC{
                    Type: csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
                },
            },
        },
    }

    // 设置节点服务能力
    d.nscap = []*csi.NodeServiceCapability{
        {
            Type: &csi.NodeServiceCapability_Rpc{
                Rpc: &csi.NodeServiceCapability_RPC{
                    Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
                },
            },
        },
    }

    return d
}

// Identity Service 实现

// GetPluginInfo 获取插件信息
func (d *Driver) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
    return &csi.GetPluginInfoResponse{
        Name:          d.name,
        VendorVersion: d.version,
    }, nil
}

// GetPluginCapabilities 获取插件能力
func (d *Driver) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
    return &csi.GetPluginCapabilitiesResponse{
        Capabilities: []*csi.PluginCapability{
            {
                Type: &csi.PluginCapability_Service_{
                    Service: &csi.PluginCapability_Service{
                        Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
                    },
                },
            },
        },
    }, nil
}

// Probe 健康检查
func (d *Driver) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
    return &csi.ProbeResponse{}, nil
}

// Controller Service 实现

// CreateVolume 创建卷
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
    if len(req.GetName()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Volume name missing in request")
    }

    if req.GetVolumeCapabilities() == nil {
        return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
    }

    // 验证卷能力
    for _, cap := range req.GetVolumeCapabilities() {
        if cap.GetAccessMode() == nil {
            return nil, status.Error(codes.InvalidArgument, "Volume access mode missing")
        }
        if !d.vcap[cap.GetAccessMode().GetMode()] {
            return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("Unsupported access mode: %v", cap.GetAccessMode().GetMode()))
        }
    }

    // 获取卷大小
    size := req.GetCapacityRange().GetRequiredBytes()
    if size == 0 {
        size = 1 * 1024 * 1024 * 1024 // 默认1GB
    }

    // 创建卷(这里简化为创建目录)
    volumePath := filepath.Join("/var/lib/csi-volumes", req.GetName())
    if err := os.MkdirAll(volumePath, 0755); err != nil {
        return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to create volume directory: %v", err))
    }

    klog.Infof("Created volume %s at %s with size %d", req.GetName(), volumePath, size)

    return &csi.CreateVolumeResponse{
        Volume: &csi.Volume{
            VolumeId:      req.GetName(),
            CapacityBytes: size,
            VolumeContext: req.GetParameters(),
        },
    }, nil
}

// DeleteVolume 删除卷
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
    if len(req.GetVolumeId()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
    }

    volumePath := filepath.Join("/var/lib/csi-volumes", req.GetVolumeId())
    if err := os.RemoveAll(volumePath); err != nil {
        return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to delete volume: %v", err))
    }

    klog.Infof("Deleted volume %s", req.GetVolumeId())

    return &csi.DeleteVolumeResponse{}, nil
}

// ControllerPublishVolume 发布卷到节点
func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
    if len(req.GetVolumeId()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
    }

    if len(req.GetNodeId()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Node ID missing in request")
    }

    // 这里可以实现卷的发布逻辑,比如挂载到特定节点
    klog.Infof("Published volume %s to node %s", req.GetVolumeId(), req.GetNodeId())

    return &csi.ControllerPublishVolumeResponse{}, nil
}

// ControllerUnpublishVolume 从节点取消发布卷
func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
    if len(req.GetVolumeId()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
    }

    klog.Infof("Unpublished volume %s from node %s", req.GetVolumeId(), req.GetNodeId())

    return &csi.ControllerUnpublishVolumeResponse{}, nil
}

// ValidateVolumeCapabilities 验证卷能力
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
    if len(req.GetVolumeId()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
    }

    if req.GetVolumeCapabilities() == nil {
        return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
    }

    // 验证每个能力
    for _, cap := range req.GetVolumeCapabilities() {
        if cap.GetAccessMode() == nil {
            return nil, status.Error(codes.InvalidArgument, "Volume access mode missing")
        }
        if !d.vcap[cap.GetAccessMode().GetMode()] {
            return &csi.ValidateVolumeCapabilitiesResponse{
                Message: fmt.Sprintf("Unsupported access mode: %v", cap.GetAccessMode().GetMode()),
            }, nil
        }
    }

    return &csi.ValidateVolumeCapabilitiesResponse{
        Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
            VolumeCapabilities: req.GetVolumeCapabilities(),
        },
    }, nil
}

// ListVolumes 列出卷
func (d *Driver) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
    return nil, status.Error(codes.Unimplemented, "ListVolumes not implemented")
}

// GetCapacity 获取容量
func (d *Driver) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
    return nil, status.Error(codes.Unimplemented, "GetCapacity not implemented")
}

// ControllerGetCapabilities 获取控制器能力
func (d *Driver) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
    return &csi.ControllerGetCapabilitiesResponse{
        Capabilities: d.cscap,
    }, nil
}

// CreateSnapshot 创建快照
func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
    return nil, status.Error(codes.Unimplemented, "CreateSnapshot not implemented")
}

// DeleteSnapshot 删除快照
func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
    return nil, status.Error(codes.Unimplemented, "DeleteSnapshot not implemented")
}

// ListSnapshots 列出快照
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
    return nil, status.Error(codes.Unimplemented, "ListSnapshots not implemented")
}

// ControllerExpandVolume 扩展卷
func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
    return nil, status.Error(codes.Unimplemented, "ControllerExpandVolume not implemented")
}

// Node Service 实现

// NodeStageVolume 暂存卷
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
    if len(req.GetVolumeId()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
    }

    if len(req.GetStagingTargetPath()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
    }

    // 创建暂存目录
    if err := os.MkdirAll(req.GetStagingTargetPath(), 0755); err != nil {
        return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to create staging directory: %v", err))
    }

    klog.Infof("Staged volume %s at %s", req.GetVolumeId(), req.GetStagingTargetPath())

    return &csi.NodeStageVolumeResponse{}, nil
}

// NodeUnstageVolume 取消暂存卷
func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
    if len(req.GetVolumeId()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
    }

    if len(req.GetStagingTargetPath()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
    }

    klog.Infof("Unstaged volume %s from %s", req.GetVolumeId(), req.GetStagingTargetPath())

    return &csi.NodeUnstageVolumeResponse{}, nil
}

// NodePublishVolume 发布卷到Pod
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
    if len(req.GetVolumeId()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
    }

    if len(req.GetTargetPath()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
    }

    // 创建目标目录
    if err := os.MkdirAll(req.GetTargetPath(), 0755); err != nil {
        return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to create target directory: %v", err))
    }

    // 绑定挂载(这里简化处理)
    volumePath := filepath.Join("/var/lib/csi-volumes", req.GetVolumeId())
    
    klog.Infof("Published volume %s to %s", req.GetVolumeId(), req.GetTargetPath())

    return &csi.NodePublishVolumeResponse{}, nil
}

// NodeUnpublishVolume 从Pod取消发布卷
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
    if len(req.GetVolumeId()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
    }

    if len(req.GetTargetPath()) == 0 {
        return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
    }

    klog.Infof("Unpublished volume %s from %s", req.GetVolumeId(), req.GetTargetPath())

    return &csi.NodeUnpublishVolumeResponse{}, nil
}

// NodeGetVolumeStats 获取卷统计信息
func (d *Driver) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
    return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats not implemented")
}

// NodeExpandVolume 扩展节点卷
func (d *Driver) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
    return nil, status.Error(codes.Unimplemented, "NodeExpandVolume not implemented")
}

// NodeGetCapabilities 获取节点能力
func (d *Driver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
    return &csi.NodeGetCapabilitiesResponse{
        Capabilities: d.nscap,
    }, nil
}

// NodeGetInfo 获取节点信息
func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
    return &csi.NodeGetInfoResponse{
        NodeId: d.nodeID,
    }, nil
}

12.7.2 CSI驱动部署

# CSI驱动部署配置
apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
  name: simple-csi
spec:
  attachRequired: false
  podInfoOnMount: true
  volumeLifecycleModes:
  - Persistent
  - Ephemeral

---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: simple-csi-node
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: simple-csi-node
  template:
    metadata:
      labels:
        app: simple-csi-node
    spec:
      serviceAccount: simple-csi-node-sa
      hostNetwork: true
      containers:
      - name: node-driver-registrar
        image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.5.0
        args:
        - --v=2
        - --csi-address=/csi/csi.sock
        - --kubelet-registration-path=/var/lib/kubelet/plugins/simple-csi/csi.sock
        env:
        - name: KUBE_NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        volumeMounts:
        - name: plugin-dir
          mountPath: /csi/
        - name: registration-dir
          mountPath: /registration/
      - name: simple-csi
        image: simple-csi:latest
        args:
        - --endpoint=$(CSI_ENDPOINT)
        - --nodeid=$(NODE_ID)
        env:
        - name: CSI_ENDPOINT
          value: unix:///csi/csi.sock
        - name: NODE_ID
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        securityContext:
          privileged: true
          capabilities:
            add: ["SYS_ADMIN"]
          allowPrivilegeEscalation: true
        volumeMounts:
        - name: plugin-dir
          mountPath: /csi
        - name: pods-mount-dir
          mountPath: /var/lib/kubelet
          mountPropagation: "Bidirectional"
        - name: device-dir
          mountPath: /dev
      volumes:
      - name: registration-dir
        hostPath:
          path: /var/lib/kubelet/plugins_registry/
          type: DirectoryOrCreate
      - name: plugin-dir
        hostPath:
          path: /var/lib/kubelet/plugins/simple-csi/
          type: DirectoryOrCreate
      - name: pods-mount-dir
        hostPath:
          path: /var/lib/kubelet
          type: Directory
      - name: device-dir
        hostPath:
          path: /dev

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: simple-csi-controller
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: simple-csi-controller
  template:
    metadata:
      labels:
        app: simple-csi-controller
    spec:
      serviceAccount: simple-csi-controller-sa
      containers:
      - name: csi-provisioner
        image: k8s.gcr.io/sig-storage/csi-provisioner:v3.2.0
        args:
        - --csi-address=$(ADDRESS)
        - --v=2
        env:
        - name: ADDRESS
          value: /var/lib/csi/sockets/pluginproxy/csi.sock
        volumeMounts:
        - name: socket-dir
          mountPath: /var/lib/csi/sockets/pluginproxy/
      - name: csi-attacher
        image: k8s.gcr.io/sig-storage/csi-attacher:v3.5.0
        args:
        - --v=2
        - --csi-address=$(ADDRESS)
        env:
        - name: ADDRESS
          value: /var/lib/csi/sockets/pluginproxy/csi.sock
        volumeMounts:
        - name: socket-dir
          mountPath: /var/lib/csi/sockets/pluginproxy/
      - name: simple-csi
        image: simple-csi:latest
        args:
        - --endpoint=$(CSI_ENDPOINT)
        env:
        - name: CSI_ENDPOINT
          value: unix:///var/lib/csi/sockets/pluginproxy/csi.sock
        volumeMounts:
        - name: socket-dir
          mountPath: /var/lib/csi/sockets/pluginproxy/
      volumes:
      - name: socket-dir
        emptyDir: {}

---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: simple-csi
provisioner: simple-csi
parameters:
  type: "local"
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true

12.8 设备插件开发

12.8.1 设备插件基础

// 设备插件示例
package main

import (
    "context"
    "fmt"
    "net"
    "os"
    "path"
    "time"

    "google.golang.org/grpc"
    pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
    "k8s.io/klog/v2"
)

const (
    resourceName = "example.com/gpu"
    serverSock   = pluginapi.DevicePluginPath + "gpu.sock"
)

// GPUDevicePlugin GPU设备插件
type GPUDevicePlugin struct {
    socket string
    server *grpc.Server
    devices map[string]*pluginapi.Device
}

// NewGPUDevicePlugin 创建新的GPU设备插件
func NewGPUDevicePlugin() *GPUDevicePlugin {
    return &GPUDevicePlugin{
        socket:  serverSock,
        devices: make(map[string]*pluginapi.Device),
    }
}

// GetDevicePluginOptions 获取设备插件选项
func (m *GPUDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
    return &pluginapi.DevicePluginOptions{}, nil
}

// ListAndWatch 列出并监控设备
func (m *GPUDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
    klog.Info("ListAndWatch called")
    
    // 发现设备
    m.discoverDevices()
    
    // 发送设备列表
    devs := make([]*pluginapi.Device, 0, len(m.devices))
    for _, dev := range m.devices {
        devs = append(devs, dev)
    }
    
    resp := &pluginapi.ListAndWatchResponse{Devices: devs}
    if err := s.Send(resp); err != nil {
        klog.Errorf("Failed to send response: %v", err)
        return err
    }
    
    // 持续监控设备变化
    for {
        time.Sleep(10 * time.Second)
        // 这里可以检查设备状态变化
        // 如果有变化,重新发送设备列表
    }
}

// Allocate 分配设备
func (m *GPUDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
    klog.Info("Allocate called")
    
    responses := &pluginapi.AllocateResponse{}
    
    for _, req := range reqs.ContainerRequests {
        response := &pluginapi.ContainerAllocateResponse{
            Envs: map[string]string{
                "GPU_DEVICES": fmt.Sprintf("%v", req.DevicesIDs),
            },
            Mounts: []*pluginapi.Mount{
                {
                    ContainerPath: "/dev/gpu0",
                    HostPath:      "/dev/gpu0",
                    ReadOnly:      false,
                },
            },
            Devices: []*pluginapi.DeviceSpec{
                {
                    ContainerPath: "/dev/gpu0",
                    HostPath:      "/dev/gpu0",
                    Permissions:   "rwm",
                },
            },
        }
        
        // 标记设备为已分配
        for _, id := range req.DevicesIDs {
            if dev, exists := m.devices[id]; exists {
                dev.Health = pluginapi.Unhealthy // 标记为不可用
            }
        }
        
        responses.ContainerResponses = append(responses.ContainerResponses, response)
    }
    
    return responses, nil
}

// GetPreferredAllocation 获取首选分配
func (m *GPUDevicePlugin) GetPreferredAllocation(ctx context.Context, req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
    return &pluginapi.PreferredAllocationResponse{}, nil
}

// PreStartContainer 容器启动前处理
func (m *GPUDevicePlugin) PreStartContainer(ctx context.Context, req *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
    return &pluginapi.PreStartContainerResponse{}, nil
}

// discoverDevices 发现设备
func (m *GPUDevicePlugin) discoverDevices() {
    // 模拟发现GPU设备
    for i := 0; i < 2; i++ {
        deviceID := fmt.Sprintf("gpu-%d", i)
        m.devices[deviceID] = &pluginapi.Device{
            ID:     deviceID,
            Health: pluginapi.Healthy,
        }
    }
    klog.Infof("Discovered %d GPU devices", len(m.devices))
}

// Start 启动设备插件
func (m *GPUDevicePlugin) Start() error {
    err := m.cleanup()
    if err != nil {
        return err
    }

    sock, err := net.Listen("unix", m.socket)
    if err != nil {
        return err
    }

    m.server = grpc.NewServer()
    pluginapi.RegisterDevicePluginServer(m.server, m)

    go func() {
        err := m.server.Serve(sock)
        if err != nil {
            klog.Errorf("Failed to serve: %v", err)
        }
    }()

    // 等待服务器启动
    conn, err := m.dial(m.socket, 5*time.Second)
    if err != nil {
        return err
    }
    conn.Close()

    return nil
}

// Stop 停止设备插件
func (m *GPUDevicePlugin) Stop() error {
    if m.server == nil {
        return nil
    }
    m.server.Stop()
    m.server = nil
    return m.cleanup()
}

// Register 注册设备插件
func (m *GPUDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
    conn, err := m.dial(kubeletEndpoint, 5*time.Second)
    if err != nil {
        return err
    }
    defer conn.Close()

    client := pluginapi.NewRegistrationClient(conn)
    reqt := &pluginapi.RegisterRequest{
        Version:      pluginapi.Version,
        Endpoint:     path.Base(m.socket),
        ResourceName: resourceName,
    }

    _, err = client.Register(context.Background(), reqt)
    if err != nil {
        return err
    }
    return nil
}

// dial 连接到指定端点
func (m *GPUDevicePlugin) dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error) {
    c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
        grpc.WithTimeout(timeout),
        grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
            return net.DialTimeout("unix", addr, timeout)
        }),
    )

    if err != nil {
        return nil, err
    }

    return c, nil
}

// cleanup 清理资源
func (m *GPUDevicePlugin) cleanup() error {
    if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) {
        return err
    }
    return nil
}

func main() {
    klog.Info("Starting GPU device plugin")
    
    plugin := NewGPUDevicePlugin()
    
    if err := plugin.Start(); err != nil {
        klog.Fatalf("Failed to start device plugin: %v", err)
    }
    
    if err := plugin.Register(pluginapi.KubeletSocket, resourceName); err != nil {
        klog.Fatalf("Failed to register device plugin: %v", err)
    }
    
    klog.Info("GPU device plugin registered")
    
    // 保持运行
    select {}
}

12.8.2 设备插件部署

# 设备插件DaemonSet
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: gpu-device-plugin
  namespace: kube-system
spec:
  selector:
    matchLabels:
      name: gpu-device-plugin
  template:
    metadata:
      labels:
        name: gpu-device-plugin
    spec:
      tolerations:
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule
      containers:
      - image: gpu-device-plugin:latest
        name: gpu-device-plugin
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop: ["ALL"]
        volumeMounts:
        - name: device-plugin
          mountPath: /var/lib/kubelet/device-plugins
        - name: dev
          mountPath: /dev
      volumes:
      - name: device-plugin
        hostPath:
          path: /var/lib/kubelet/device-plugins
      - name: dev
        hostPath:
          path: /dev
      nodeSelector:
        accelerator: gpu

12.9 准入控制器

12.9.1 Validating Admission Webhook

// 验证准入控制器
package main

import (
    "context"
    "crypto/tls"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"
    "strings"

    admissionv1 "k8s.io/api/admission/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/serializer"
    "k8s.io/klog/v2"
)

var (
    scheme = runtime.NewScheme()
    codecs = serializer.NewCodecFactory(scheme)
)

// WebhookServer Webhook服务器
type WebhookServer struct {
    server *http.Server
}

// WhSvrParameters Webhook服务器参数
type WhSvrParameters struct {
    port     int    // webhook服务器端口
    certFile string // TLS证书文件路径
    keyFile  string // TLS私钥文件路径
}

// 验证Pod的函数
func (whsvr *WebhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
    req := ar.Request
    var pod corev1.Pod
    
    if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
        klog.Errorf("Could not unmarshal raw object: %v", err)
        return &admissionv1.AdmissionResponse{
            Result: &metav1.Status{
                Message: err.Error(),
            },
        }
    }

    klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v",
        req.Kind, req.Namespace, req.Name, req.UID, req.Operation, req.UserInfo)

    allowed := true
    message := ""

    // 验证规则1: 检查是否使用了特权容器
    for _, container := range pod.Spec.Containers {
        if container.SecurityContext != nil && container.SecurityContext.Privileged != nil && *container.SecurityContext.Privileged {
            allowed = false
            message = "Privileged containers are not allowed"
            break
        }
    }

    // 验证规则2: 检查是否使用了hostNetwork
    if allowed && pod.Spec.HostNetwork {
        allowed = false
        message = "HostNetwork is not allowed"
    }

    // 验证规则3: 检查镜像来源
    if allowed {
        for _, container := range pod.Spec.Containers {
            if !strings.HasPrefix(container.Image, "registry.company.com/") {
                allowed = false
                message = fmt.Sprintf("Image %s is not from approved registry", container.Image)
                break
            }
        }
    }

    // 验证规则4: 检查资源限制
    if allowed {
        for _, container := range pod.Spec.Containers {
            if container.Resources.Limits == nil {
                allowed = false
                message = "Resource limits must be specified"
                break
            }
            if container.Resources.Limits.Cpu().IsZero() || container.Resources.Limits.Memory().IsZero() {
                allowed = false
                message = "CPU and memory limits must be specified"
                break
            }
        }
    }

    return &admissionv1.AdmissionResponse{
        UID:     req.UID,
        Allowed: allowed,
        Result: &metav1.Status{
            Message: message,
        },
    }
}

// 处理验证请求
func (whsvr *WebhookServer) serve(w http.ResponseWriter, r *http.Request) {
    var body []byte
    if r.Body != nil {
        if data, err := ioutil.ReadAll(r.Body); err == nil {
            body = data
        }
    }
    if len(body) == 0 {
        klog.Error("empty body")
        http.Error(w, "empty body", http.StatusBadRequest)
        return
    }

    // 验证content-type
    contentType := r.Header.Get("Content-Type")
    if contentType != "application/json" {
        klog.Errorf("Content-Type=%s, expect application/json", contentType)
        http.Error(w, "invalid Content-Type, expect `application/json`", http.StatusUnsupportedMediaType)
        return
    }

    var admissionResponse *admissionv1.AdmissionResponse
    ar := admissionv1.AdmissionReview{}
    if _, _, err := codecs.UniversalDeserializer().Decode(body, nil, &ar); err != nil {
        klog.Errorf("Could not decode body: %v", err)
        admissionResponse = &admissionv1.AdmissionResponse{
            Result: &metav1.Status{
                Message: err.Error(),
            },
        }
    } else {
        admissionResponse = whsvr.validate(&ar)
    }

    admissionReview := admissionv1.AdmissionReview{}
    if admissionResponse != nil {
        admissionReview.Response = admissionResponse
        if ar.Request != nil {
            admissionReview.Response.UID = ar.Request.UID
        }
    }

    respBytes, _ := json.Marshal(admissionReview)
    w.Header().Set("Content-Type", "application/json")
    if _, err := w.Write(respBytes); err != nil {
        klog.Error(err)
    }
}

func main() {
    var parameters WhSvrParameters
    parameters.port = 8443
    parameters.certFile = "/etc/webhook/certs/tls.crt"
    parameters.keyFile = "/etc/webhook/certs/tls.key"

    cert, err := tls.LoadX509KeyPair(parameters.certFile, parameters.keyFile)
    if err != nil {
        klog.Errorf("Failed to load key pair: %v", err)
    }

    server := &WebhookServer{
        server: &http.Server{
            Addr:      fmt.Sprintf(":%v", parameters.port),
            TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
        },
    }

    mux := http.NewServeMux()
    mux.HandleFunc("/validate", server.serve)
    server.server.Handler = mux

    klog.Info("Starting webhook server...")
    if err := server.server.ListenAndServeTLS("", ""); err != nil {
        klog.Errorf("Failed to start webhook server: %v", err)
    }
}

12.9.2 Mutating Admission Webhook

// 变更准入控制器
package main

import (
    "context"
    "crypto/tls"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"

    admissionv1 "k8s.io/api/admission/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/serializer"
    "k8s.io/klog/v2"
)

var (
    scheme = runtime.NewScheme()
    codecs = serializer.NewCodecFactory(scheme)
)

// 变更Pod的函数
func (whsvr *WebhookServer) mutate(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
    req := ar.Request
    var pod corev1.Pod
    
    if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
        klog.Errorf("Could not unmarshal raw object: %v", err)
        return &admissionv1.AdmissionResponse{
            Result: &metav1.Status{
                Message: err.Error(),
            },
        }
    }

    klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v",
        req.Kind, req.Namespace, req.Name, req.UID, req.Operation, req.UserInfo)

    var patches []map[string]interface{}

    // 变更规则1: 添加默认标签
    if pod.Labels == nil {
        patches = append(patches, map[string]interface{}{
            "op":    "add",
            "path":  "/metadata/labels",
            "value": map[string]string{},
        })
    }
    
    patches = append(patches, map[string]interface{}{
        "op":    "add",
        "path":  "/metadata/labels/injected-by",
        "value": "admission-webhook",
    })
    
    patches = append(patches, map[string]interface{}{
        "op":    "add",
        "path":  "/metadata/labels/injection-timestamp",
        "value": metav1.Now().Format("2006-01-02T15:04:05Z"),
    })

    // 变更规则2: 添加sidecar容器
    if req.Namespace != "kube-system" && req.Namespace != "kube-public" {
        sidecarContainer := corev1.Container{
            Name:  "logging-sidecar",
            Image: "fluent/fluent-bit:latest",
            VolumeMounts: []corev1.VolumeMount{
                {
                    Name:      "varlog",
                    MountPath: "/var/log",
                },
            },
        }
        
        patches = append(patches, map[string]interface{}{
            "op":    "add",
            "path":  "/spec/containers/-",
            "value": sidecarContainer,
        })
        
        // 添加对应的卷
        logVolume := corev1.Volume{
            Name: "varlog",
            VolumeSource: corev1.VolumeSource{
                EmptyDir: &corev1.EmptyDirVolumeSource{},
            },
        }
        
        if len(pod.Spec.Volumes) == 0 {
            patches = append(patches, map[string]interface{}{
                "op":    "add",
                "path":  "/spec/volumes",
                "value": []corev1.Volume{logVolume},
            })
        } else {
            patches = append(patches, map[string]interface{}{
                "op":    "add",
                "path":  "/spec/volumes/-",
                "value": logVolume,
            })
        }
    }

    // 变更规则3: 设置默认资源限制
    for i, container := range pod.Spec.Containers {
        if container.Resources.Limits == nil {
            patches = append(patches, map[string]interface{}{
                "op":   "add",
                "path": fmt.Sprintf("/spec/containers/%d/resources/limits", i),
                "value": map[string]string{
                    "cpu":    "500m",
                    "memory": "512Mi",
                },
            })
        }
        
        if container.Resources.Requests == nil {
            patches = append(patches, map[string]interface{}{
                "op":   "add",
                "path": fmt.Sprintf("/spec/containers/%d/resources/requests", i),
                "value": map[string]string{
                    "cpu":    "100m",
                    "memory": "128Mi",
                },
            })
        }
    }

    // 变更规则4: 添加安全上下文
    for i, container := range pod.Spec.Containers {
        if container.SecurityContext == nil {
            patches = append(patches, map[string]interface{}{
                "op":   "add",
                "path": fmt.Sprintf("/spec/containers/%d/securityContext", i),
                "value": map[string]interface{}{
                    "runAsNonRoot":             true,
                    "runAsUser":                1000,
                    "allowPrivilegeEscalation": false,
                    "readOnlyRootFilesystem":   true,
                    "capabilities": map[string]interface{}{
                        "drop": []string{"ALL"},
                    },
                },
            })
        }
    }

    patchBytes, _ := json.Marshal(patches)
    
    return &admissionv1.AdmissionResponse{
        UID:     req.UID,
        Allowed: true,
        Patch:   patchBytes,
        PatchType: func() *admissionv1.PatchType {
            pt := admissionv1.PatchTypeJSONPatch
            return &pt
        }(),
    }
}

12.9.3 准入控制器部署

# 准入控制器部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: admission-webhook
  namespace: webhook-system
spec:
  replicas: 2
  selector:
    matchLabels:
      app: admission-webhook
  template:
    metadata:
      labels:
        app: admission-webhook
    spec:
      serviceAccountName: admission-webhook
      containers:
      - name: webhook
        image: admission-webhook:latest
        ports:
        - containerPort: 8443
          name: webhook-api
        volumeMounts:
        - name: webhook-tls-certs
          mountPath: /etc/webhook/certs
          readOnly: true
        env:
        - name: TLS_CERT_FILE
          value: /etc/webhook/certs/tls.crt
        - name: TLS_PRIVATE_KEY_FILE
          value: /etc/webhook/certs/tls.key
        resources:
          limits:
            cpu: 500m
            memory: 512Mi
          requests:
            cpu: 100m
            memory: 128Mi
        livenessProbe:
          httpGet:
            path: /health
            port: 8443
            scheme: HTTPS
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8443
            scheme: HTTPS
          initialDelaySeconds: 5
          periodSeconds: 5
      volumes:
      - name: webhook-tls-certs
        secret:
          secretName: webhook-tls

---
apiVersion: v1
kind: Service
metadata:
  name: admission-webhook-service
  namespace: webhook-system
spec:
  selector:
    app: admission-webhook
  ports:
  - name: webhook-api
    port: 443
    targetPort: 8443
    protocol: TCP

---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingAdmissionWebhook
metadata:
  name: pod-policy-webhook
webhooks:
- name: pod-policy.example.com
  clientConfig:
    service:
      name: admission-webhook-service
      namespace: webhook-system
      path: "/validate"
    caBundle: LS0tLS1CRUdJTi... # Base64编码的CA证书
  rules:
  - operations: ["CREATE", "UPDATE"]
    apiGroups: [""]
    apiVersions: ["v1"]
    resources: ["pods"]
  admissionReviewVersions: ["v1", "v1beta1"]
  sideEffects: None
  failurePolicy: Fail

---
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingAdmissionWebhook
metadata:
  name: pod-mutating-webhook
webhooks:
- name: pod-mutating.example.com
  clientConfig:
    service:
      name: admission-webhook-service
      namespace: webhook-system
      path: "/mutate"
    caBundle: LS0tLS1CRUdJTi... # Base64编码的CA证书
  rules:
  - operations: ["CREATE"]
    apiGroups: [""]
    apiVersions: ["v1"]
    resources: ["pods"]
  admissionReviewVersions: ["v1", "v1beta1"]
  sideEffects: None
  failurePolicy: Fail

12.10 扩展机制最佳实践

12.10.1 开发最佳实践

#!/bin/bash
# 扩展开发检查脚本

echo "=== Kubernetes扩展开发最佳实践检查 ==="

# 1. CRD设计检查
check_crd_design() {
    echo "\n1. CRD设计检查:"
    
    # 检查CRD版本策略
    echo "  - 检查CRD版本策略..."
    kubectl get crd -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.spec.versions[*].name}{"\n"}{end}' | \
    while read crd versions; do
        if [[ $(echo $versions | wc -w) -gt 1 ]]; then
            echo "    ✓ CRD $crd 支持多版本: $versions"
        else
            echo "    ⚠ CRD $crd 只有单版本: $versions"
        fi
    done
    
    # 检查CRD字段验证
    echo "  - 检查CRD字段验证..."
    kubectl get crd -o json | jq -r '.items[] | select(.spec.versions[].schema.openAPIV3Schema.properties != null) | .metadata.name' | \
    while read crd; do
        echo "    ✓ CRD $crd 包含字段验证"
    done
}

# 2. Operator模式检查
check_operator_pattern() {
    echo "\n2. Operator模式检查:"
    
    # 检查控制器部署
    echo "  - 检查控制器部署..."
    kubectl get deployment -A -l "app.kubernetes.io/component=controller" -o jsonpath='{range .items[*]}{.metadata.namespace}{"\t"}{.metadata.name}{"\t"}{.spec.replicas}{"\n"}{end}' | \
    while read namespace name replicas; do
        if [[ $replicas -gt 1 ]]; then
            echo "    ✓ 控制器 $namespace/$name 配置了高可用 (replicas: $replicas)"
        else
            echo "    ⚠ 控制器 $namespace/$name 未配置高可用 (replicas: $replicas)"
        fi
    done
    
    # 检查RBAC权限
    echo "  - 检查RBAC权限..."
    kubectl get clusterrole -o json | jq -r '.items[] | select(.rules[]?.verbs[]? == "*") | .metadata.name' | \
    while read role; do
        echo "    ⚠ ClusterRole $role 拥有过于宽泛的权限"
    done
}

# 3. 准入控制器检查
check_admission_controllers() {
    echo "\n3. 准入控制器检查:"
    
    # 检查Webhook配置
    echo "  - 检查ValidatingAdmissionWebhook..."
    kubectl get validatingadmissionwebhook -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.webhooks[*].failurePolicy}{"\n"}{end}' | \
    while read webhook policy; do
        if [[ "$policy" == "Fail" ]]; then
            echo "    ✓ Webhook $webhook 配置了Fail策略"
        else
            echo "    ⚠ Webhook $webhook 配置了Ignore策略,可能影响安全性"
        fi
    done
    
    echo "  - 检查MutatingAdmissionWebhook..."
    kubectl get mutatingadmissionwebhook -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.webhooks[*].sideEffects}{"\n"}{end}' | \
    while read webhook sideEffects; do
        if [[ "$sideEffects" == "None" ]]; then
            echo "    ✓ Webhook $webhook 声明无副作用"
        else
            echo "    ⚠ Webhook $webhook 可能有副作用: $sideEffects"
        fi
    done
}

# 4. 性能和可靠性检查
check_performance_reliability() {
    echo "\n4. 性能和可靠性检查:"
    
    # 检查资源限制
    echo "  - 检查扩展组件资源限制..."
    kubectl get deployment -A -o json | jq -r '.items[] | select(.metadata.labels["app.kubernetes.io/component"] == "controller") | "\(.metadata.namespace)/\(.metadata.name)\t\(.spec.template.spec.containers[0].resources.limits // "none")"' | \
    while read deployment limits; do
        if [[ "$limits" == "none" ]]; then
            echo "    ⚠ 部署 $deployment 未设置资源限制"
        else
            echo "    ✓ 部署 $deployment 设置了资源限制"
        fi
    done
    
    # 检查健康检查
    echo "  - 检查健康检查配置..."
    kubectl get deployment -A -o json | jq -r '.items[] | select(.metadata.labels["app.kubernetes.io/component"] == "controller") | "\(.metadata.namespace)/\(.metadata.name)\t\(.spec.template.spec.containers[0].livenessProbe != null)\t\(.spec.template.spec.containers[0].readinessProbe != null)"' | \
    while read deployment liveness readiness; do
        if [[ "$liveness" == "true" && "$readiness" == "true" ]]; then
            echo "    ✓ 部署 $deployment 配置了完整的健康检查"
        else
            echo "    ⚠ 部署 $deployment 缺少健康检查配置"
        fi
    done
}

# 5. 安全性检查
check_security() {
    echo "\n5. 安全性检查:"
    
    # 检查ServiceAccount
    echo "  - 检查ServiceAccount使用..."
    kubectl get deployment -A -o json | jq -r '.items[] | select(.metadata.labels["app.kubernetes.io/component"] == "controller") | "\(.metadata.namespace)/\(.metadata.name)\t\(.spec.template.spec.serviceAccountName // "default")"' | \
    while read deployment sa; do
        if [[ "$sa" == "default" ]]; then
            echo "    ⚠ 部署 $deployment 使用默认ServiceAccount"
        else
            echo "    ✓ 部署 $deployment 使用专用ServiceAccount: $sa"
        fi
    done
    
    # 检查安全上下文
    echo "  - 检查安全上下文..."
    kubectl get deployment -A -o json | jq -r '.items[] | select(.metadata.labels["app.kubernetes.io/component"] == "controller") | "\(.metadata.namespace)/\(.metadata.name)\t\(.spec.template.spec.containers[0].securityContext.runAsNonRoot // false)"' | \
    while read deployment nonRoot; do
        if [[ "$nonRoot" == "true" ]]; then
            echo "    ✓ 部署 $deployment 配置了非root运行"
        else
            echo "    ⚠ 部署 $deployment 可能以root用户运行"
        fi
    done
}

# 执行所有检查
check_crd_design
check_operator_pattern
check_admission_controllers
check_performance_reliability
check_security

echo "\n=== 检查完成 ==="

12.10.2 部署和运维最佳实践

# 扩展组件部署模板
apiVersion: v1
kind: Namespace
metadata:
  name: extension-system
  labels:
    name: extension-system
    pod-security.kubernetes.io/enforce: restricted
    pod-security.kubernetes.io/audit: restricted
    pod-security.kubernetes.io/warn: restricted

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: extension-controller
  namespace: extension-system
automountServiceAccountToken: true

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: extension-controller
rules:
- apiGroups: [""]
  resources: ["events"]
  verbs: ["create", "patch"]
- apiGroups: [""]
  resources: ["configmaps"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
  resources: ["deployments"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["custom.io"]
  resources: ["customresources"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["custom.io"]
  resources: ["customresources/status"]
  verbs: ["get", "update", "patch"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: extension-controller
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: extension-controller
subjects:
- kind: ServiceAccount
  name: extension-controller
  namespace: extension-system

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: extension-controller
  namespace: extension-system
  labels:
    app.kubernetes.io/name: extension-controller
    app.kubernetes.io/component: controller
    app.kubernetes.io/version: "1.0.0"
spec:
  replicas: 2
  selector:
    matchLabels:
      app.kubernetes.io/name: extension-controller
  template:
    metadata:
      labels:
        app.kubernetes.io/name: extension-controller
        app.kubernetes.io/component: controller
    spec:
      serviceAccountName: extension-controller
      securityContext:
        runAsNonRoot: true
        runAsUser: 65532
        fsGroup: 65532
        seccompProfile:
          type: RuntimeDefault
      containers:
      - name: controller
        image: extension-controller:v1.0.0
        imagePullPolicy: IfNotPresent
        command:
        - /manager
        args:
        - --leader-elect
        - --metrics-bind-address=:8080
        - --health-probe-bind-address=:8081
        env:
        - name: WATCH_NAMESPACE
          value: ""
        ports:
        - name: metrics
          containerPort: 8080
          protocol: TCP
        - name: health
          containerPort: 8081
          protocol: TCP
        livenessProbe:
          httpGet:
            path: /healthz
            port: health
          initialDelaySeconds: 15
          periodSeconds: 20
        readinessProbe:
          httpGet:
            path: /readyz
            port: health
          initialDelaySeconds: 5
          periodSeconds: 10
        resources:
          limits:
            cpu: 500m
            memory: 512Mi
          requests:
            cpu: 100m
            memory: 128Mi
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          capabilities:
            drop:
            - ALL
        volumeMounts:
        - name: tmp
          mountPath: /tmp
      volumes:
      - name: tmp
        emptyDir: {}
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app.kubernetes.io/name
                  operator: In
                  values:
                  - extension-controller
              topologyKey: kubernetes.io/hostname
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule
      - key: node-role.kubernetes.io/control-plane
        operator: Exists
        effect: NoSchedule

12.11 总结

本章详细介绍了Kubernetes的高级特性和扩展机制,包括:

核心扩展机制

  1. 自定义资源定义(CRD): 扩展Kubernetes API,定义自定义资源类型
  2. Operator模式: 基于CRD实现应用程序的自动化运维
  3. 调度器扩展: 自定义调度逻辑,优化Pod调度策略
  4. 网络插件(CNI): 实现自定义网络解决方案
  5. 存储插件(CSI): 集成外部存储系统
  6. 设备插件: 管理和分配特殊硬件资源
  7. 准入控制器: 实现策略验证和资源变更

开发要点

  • CRD设计: 合理的API设计、版本管理和字段验证
  • 控制器模式: 声明式API、控制循环和状态协调
  • 错误处理: 重试机制、状态更新和事件记录
  • 性能优化: 缓存机制、批量处理和资源限制

部署最佳实践

  • 安全性: 最小权限原则、安全上下文和网络策略
  • 可靠性: 高可用部署、健康检查和故障恢复
  • 可观测性: 指标监控、日志记录和链路追踪
  • 运维友好: 配置管理、版本控制和升级策略

应用场景

  • 应用管理: 数据库、消息队列等有状态应用的自动化管理
  • 基础设施: 网络、存储、安全等基础设施组件的集成
  • 策略执行: 安全策略、合规检查和资源配额的自动化执行
  • 工作流编排: 复杂业务流程的自动化编排和执行

Kubernetes的扩展机制为用户提供了强大的定制能力,通过合理使用这些机制,可以构建适合特定业务需求的容器平台。在下一章中,我们将学习Kubernetes生态系统和工具链,了解如何利用社区工具提升开发和运维效率。 “`