12.1 概述
Kubernetes作为一个成熟的容器编排平台,提供了丰富的高级特性和强大的扩展机制。本章将深入探讨这些高级功能,包括自定义资源定义(CRD)、Operator模式、准入控制器、调度器扩展、网络插件开发等内容,帮助你更好地理解和扩展Kubernetes的能力。
12.1.1 高级特性概览
- 自定义资源定义(CRD): 扩展Kubernetes API
- Operator模式: 自动化应用管理
- 准入控制器: 请求验证和变更
- 调度器扩展: 自定义调度逻辑
- 网络插件: CNI插件开发
- 存储插件: CSI驱动开发
- 设备插件: GPU等特殊资源管理
12.1.2 扩展机制架构
# Kubernetes扩展架构图
apiVersion: v1
kind: ConfigMap
metadata:
name: k8s-extension-architecture
data:
architecture.md: |
# Kubernetes扩展架构
## API扩展层
┌─────────────────────────────────────────┐
│ kubectl/API客户端 │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ kube-apiserver │
│ ┌─────────────────────────────────────┐ │
│ │ 准入控制器链 │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ VAC │ │ MAC │ │ ... │ │ WAC │ │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ └─────────────────────────────────────┘ │
│ ┌─────────────────────────────────────┐ │
│ │ API聚合层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 核心API │ │ 扩展API │ │ │
│ │ │ │ │ (CRD) │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────┘ │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ etcd存储 │
└─────────────────────────────────────────┘
## 控制器扩展层
┌─────────────────────────────────────────┐
│ kube-controller-manager │
│ ┌─────────────────────────────────────┐ │
│ │ 内置控制器 │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ RC │ │ RS │ │ Dep │ │ ... │ │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 自定义控制器 │
│ ┌─────────────────────────────────────┐ │
│ │ Operator │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ DB │ │ MQ │ │ App │ │ ... │ │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────┘
## 运行时扩展层
┌─────────────────────────────────────────┐
│ kubelet │
│ ┌─────────────────────────────────────┐ │
│ │ 插件接口 │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ CNI │ │ CSI │ │ CRI │ │ DPI │ │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────┘
## 调度扩展层
┌─────────────────────────────────────────┐
│ kube-scheduler │
│ ┌─────────────────────────────────────┐ │
│ │ 调度框架 │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ Pre │ │Filt │ │Score│ │Post │ │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────┘
12.2 自定义资源定义(CRD)
12.2.1 CRD基础概念
自定义资源定义(Custom Resource Definition,CRD)允许用户扩展Kubernetes API,定义自己的资源类型。
# 基础CRD示例
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: webapps.example.com
spec:
group: example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
replicas:
type: integer
minimum: 1
maximum: 100
image:
type: string
port:
type: integer
minimum: 1
maximum: 65535
resources:
type: object
properties:
cpu:
type: string
memory:
type: string
required:
- replicas
- image
- port
status:
type: object
properties:
phase:
type: string
enum:
- Pending
- Running
- Failed
readyReplicas:
type: integer
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
reason:
type: string
message:
type: string
lastTransitionTime:
type: string
format: date-time
required:
- spec
additionalPrinterColumns:
- name: Replicas
type: integer
description: The number of replicas
jsonPath: .spec.replicas
- name: Ready
type: integer
description: The number of ready replicas
jsonPath: .status.readyReplicas
- name: Phase
type: string
description: The phase of the webapp
jsonPath: .status.phase
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
scope: Namespaced
names:
plural: webapps
singular: webapp
kind: WebApp
shortNames:
- wa
12.2.2 高级CRD特性
# 高级CRD特性示例
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: databases.db.example.com
spec:
group: db.example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
type:
type: string
enum:
- mysql
- postgresql
- mongodb
version:
type: string
pattern: '^[0-9]+\.[0-9]+\.[0-9]+$'
storage:
type: object
properties:
size:
type: string
pattern: '^[0-9]+[KMGT]i?$'
storageClass:
type: string
required:
- size
backup:
type: object
properties:
enabled:
type: boolean
schedule:
type: string
pattern: '^(@(annually|yearly|monthly|weekly|daily|hourly|reboot))|(@every (\d+(ns|us|µs|ms|s|m|h))+)|((((\d+,)+\d+|(\d+([/-])\d+)|\d+|\*) ?){5,7})$'
retention:
type: integer
minimum: 1
maximum: 365
monitoring:
type: object
properties:
enabled:
type: boolean
metrics:
type: array
items:
type: string
enum:
- cpu
- memory
- disk
- connections
- queries
required:
- type
- version
- storage
status:
type: object
properties:
phase:
type: string
enum:
- Pending
- Creating
- Running
- Updating
- Deleting
- Failed
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
enum:
- "True"
- "False"
- "Unknown"
reason:
type: string
message:
type: string
lastTransitionTime:
type: string
format: date-time
endpoints:
type: object
properties:
primary:
type: string
readonly:
type: array
items:
type: string
backupStatus:
type: object
properties:
lastBackup:
type: string
format: date-time
nextBackup:
type: string
format: date-time
backupCount:
type: integer
# 子资源支持
subresources:
status: {}
scale:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
labelSelectorPath: .status.selector
# 转换Webhook
conversion:
strategy: Webhook
webhook:
clientConfig:
service:
namespace: db-system
name: db-conversion-webhook
path: /convert
conversionReviewVersions:
- v1
- v1beta1
- name: v1beta1
served: true
storage: false
deprecated: true
deprecationWarning: "db.example.com/v1beta1 Database is deprecated; use db.example.com/v1 Database"
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
x-kubernetes-preserve-unknown-fields: true
status:
type: object
x-kubernetes-preserve-unknown-fields: true
scope: Namespaced
names:
plural: databases
singular: database
kind: Database
shortNames:
- db
categories:
- all
- storage
12.2.3 CRD验证和Webhook
# 验证Webhook配置
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingAdmissionWebhook
metadata:
name: database-validator
webhooks:
- name: database.validator.example.com
clientConfig:
service:
name: database-webhook
namespace: db-system
path: /validate
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["db.example.com"]
apiVersions: ["v1"]
resources: ["databases"]
admissionReviewVersions: ["v1", "v1beta1"]
sideEffects: None
failurePolicy: Fail
---
# 变更Webhook配置
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingAdmissionWebhook
metadata:
name: database-mutator
webhooks:
- name: database.mutator.example.com
clientConfig:
service:
name: database-webhook
namespace: db-system
path: /mutate
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["db.example.com"]
apiVersions: ["v1"]
resources: ["databases"]
admissionReviewVersions: ["v1", "v1beta1"]
sideEffects: None
failurePolicy: Fail
12.2.4 CRD使用示例
# 使用自定义资源
apiVersion: db.example.com/v1
kind: Database
metadata:
name: my-postgres
namespace: production
spec:
type: postgresql
version: "13.7.0"
storage:
size: 100Gi
storageClass: fast-ssd
backup:
enabled: true
schedule: "0 2 * * *" # 每天凌晨2点
retention: 30
monitoring:
enabled: true
metrics:
- cpu
- memory
- disk
- connections
- queries
---
# WebApp自定义资源示例
apiVersion: example.com/v1
kind: WebApp
metadata:
name: my-webapp
namespace: default
spec:
replicas: 3
image: nginx:1.21
port: 80
resources:
cpu: "500m"
memory: "512Mi"
12.3 Operator模式
12.3.1 Operator基础概念
Operator是一种扩展Kubernetes功能的方法,它使用自定义资源和控制器来管理应用程序。
// Operator控制器示例
package main
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
webappv1 "example.com/webapp-operator/api/v1"
)
// WebAppReconciler reconciles a WebApp object
type WebAppReconciler struct {
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=example.com,resources=webapps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=example.com,resources=webapps/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=example.com,resources=webapps/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// Reconcile is part of the main kubernetes reconciliation loop
func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
// 获取WebApp实例
var webapp webappv1.WebApp
if err := r.Get(ctx, req.NamespacedName, &webapp); err != nil {
if errors.IsNotFound(err) {
log.Info("WebApp resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get WebApp")
return ctrl.Result{}, err
}
// 检查是否正在删除
if webapp.ObjectMeta.DeletionTimestamp.IsZero() {
// 添加finalizer
if !controllerutil.ContainsFinalizer(&webapp, "webapp.example.com/finalizer") {
controllerutil.AddFinalizer(&webapp, "webapp.example.com/finalizer")
return ctrl.Result{}, r.Update(ctx, &webapp)
}
} else {
// 处理删除逻辑
if controllerutil.ContainsFinalizer(&webapp, "webapp.example.com/finalizer") {
if err := r.doFinalizerOperationsForWebApp(&webapp); err != nil {
return ctrl.Result{}, err
}
controllerutil.RemoveFinalizer(&webapp, "webapp.example.com/finalizer")
return ctrl.Result{}, r.Update(ctx, &webapp)
}
return ctrl.Result{}, nil
}
// 协调Deployment
if err := r.reconcileDeployment(ctx, &webapp); err != nil {
log.Error(err, "Failed to reconcile Deployment")
return ctrl.Result{}, err
}
// 协调Service
if err := r.reconcileService(ctx, &webapp); err != nil {
log.Error(err, "Failed to reconcile Service")
return ctrl.Result{}, err
}
// 更新状态
if err := r.updateWebAppStatus(ctx, &webapp); err != nil {
log.Error(err, "Failed to update WebApp status")
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
}
// reconcileDeployment 协调Deployment资源
func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *webappv1.WebApp) error {
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
},
}
op, err := controllerutil.CreateOrUpdate(ctx, r.Client, deployment, func() error {
// 设置Deployment规格
deployment.Spec = appsv1.DeploymentSpec{
Replicas: &webapp.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": webapp.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": webapp.Name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "webapp",
Image: webapp.Spec.Image,
Ports: []corev1.ContainerPort{
{
ContainerPort: webapp.Spec.Port,
Protocol: corev1.ProtocolTCP,
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(webapp.Spec.Resources.CPU),
corev1.ResourceMemory: resource.MustParse(webapp.Spec.Resources.Memory),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(webapp.Spec.Resources.CPU),
corev1.ResourceMemory: resource.MustParse(webapp.Spec.Resources.Memory),
},
},
},
},
},
},
}
// 设置所有者引用
return controllerutil.SetControllerReference(webapp, deployment, r.Scheme)
})
if err != nil {
return err
}
if op != controllerutil.OperationResultNone {
log.FromContext(ctx).Info("Deployment reconciled", "operation", op)
}
return nil
}
// reconcileService 协调Service资源
func (r *WebAppReconciler) reconcileService(ctx context.Context, webapp *webappv1.WebApp) error {
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
},
}
op, err := controllerutil.CreateOrUpdate(ctx, r.Client, service, func() error {
// 设置Service规格
service.Spec = corev1.ServiceSpec{
Selector: map[string]string{
"app": webapp.Name,
},
Ports: []corev1.ServicePort{
{
Port: 80,
TargetPort: intstr.FromInt(int(webapp.Spec.Port)),
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceTypeClusterIP,
}
// 设置所有者引用
return controllerutil.SetControllerReference(webapp, service, r.Scheme)
})
if err != nil {
return err
}
if op != controllerutil.OperationResultNone {
log.FromContext(ctx).Info("Service reconciled", "operation", op)
}
return nil
}
// updateWebAppStatus 更新WebApp状态
func (r *WebAppReconciler) updateWebAppStatus(ctx context.Context, webapp *webappv1.WebApp) error {
// 获取Deployment状态
deployment := &appsv1.Deployment{}
err := r.Get(ctx, client.ObjectKey{
Namespace: webapp.Namespace,
Name: webapp.Name,
}, deployment)
if err != nil {
return err
}
// 更新状态
webapp.Status.ReadyReplicas = deployment.Status.ReadyReplicas
if deployment.Status.ReadyReplicas == webapp.Spec.Replicas {
webapp.Status.Phase = "Running"
} else {
webapp.Status.Phase = "Pending"
}
// 更新条件
now := metav1.Now()
condition := webappv1.WebAppCondition{
Type: "Ready",
Status: "True",
LastTransitionTime: now,
Reason: "DeploymentReady",
Message: "All replicas are ready",
}
if deployment.Status.ReadyReplicas != webapp.Spec.Replicas {
condition.Status = "False"
condition.Reason = "DeploymentNotReady"
condition.Message = fmt.Sprintf("Ready replicas: %d/%d", deployment.Status.ReadyReplicas, webapp.Spec.Replicas)
}
// 更新或添加条件
webapp.Status.Conditions = updateCondition(webapp.Status.Conditions, condition)
return r.Status().Update(ctx, webapp)
}
// doFinalizerOperationsForWebApp 执行finalizer操作
func (r *WebAppReconciler) doFinalizerOperationsForWebApp(webapp *webappv1.WebApp) error {
// 在这里执行清理操作
// 例如:清理外部资源、发送通知等
log.FromContext(context.Background()).Info("Performing finalizer operations for WebApp", "name", webapp.Name)
return nil
}
// updateCondition 更新条件
func updateCondition(conditions []webappv1.WebAppCondition, newCondition webappv1.WebAppCondition) []webappv1.WebAppCondition {
for i, condition := range conditions {
if condition.Type == newCondition.Type {
if condition.Status != newCondition.Status {
conditions[i] = newCondition
}
return conditions
}
}
return append(conditions, newCondition)
}
// SetupWithManager sets up the controller with the Manager.
func (r *WebAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.WebApp{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Complete(r)
}
12.3.2 Operator SDK使用
#!/bin/bash
# Operator SDK项目创建脚本
echo "=== Operator SDK项目创建 ==="
# 1. 初始化项目
echo "1. 初始化Operator项目:"
operator-sdk init --domain=example.com --repo=github.com/example/webapp-operator
# 2. 创建API
echo "2. 创建API:"
operator-sdk create api --group=webapp --version=v1 --kind=WebApp --resource --controller
# 3. 生成CRD
echo "3. 生成CRD:"
make manifests
# 4. 安装CRD到集群
echo "4. 安装CRD:"
make install
# 5. 运行控制器
echo "5. 运行控制器:"
make run
echo "=== Operator项目创建完成 ==="
12.3.3 Operator部署配置
# Operator部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: webapp-operator-controller-manager
namespace: webapp-operator-system
labels:
control-plane: controller-manager
spec:
selector:
matchLabels:
control-plane: controller-manager
replicas: 1
template:
metadata:
labels:
control-plane: controller-manager
spec:
securityContext:
runAsNonRoot: true
containers:
- command:
- /manager
args:
- --leader-elect
image: webapp-operator:latest
name: manager
securityContext:
allowPrivilegeEscalation: false
livenessProbe:
httpGet:
path: /healthz
port: 8081
initialDelaySeconds: 15
periodSeconds: 20
readinessProbe:
httpGet:
path: /readyz
port: 8081
initialDelaySeconds: 5
periodSeconds: 10
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
serviceAccountName: webapp-operator-controller-manager
terminationGracePeriodSeconds: 10
---
# ServiceAccount
apiVersion: v1
kind: ServiceAccount
metadata:
name: webapp-operator-controller-manager
namespace: webapp-operator-system
---
# ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: webapp-operator-manager-role
rules:
- apiGroups:
- ""
resources:
- services
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- webapp.example.com
resources:
- webapps
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- webapp.example.com
resources:
- webapps/finalizers
verbs:
- update
- apiGroups:
- webapp.example.com
resources:
- webapps/status
verbs:
- get
- patch
- update
---
# ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: webapp-operator-manager-rolebinding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: webapp-operator-manager-role
subjects:
- kind: ServiceAccount
name: webapp-operator-controller-manager
namespace: webapp-operator-system
12.4 准入控制器
12.4.1 验证准入控制器
// 验证Webhook服务器
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/klog/v2"
)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
type WebhookServer struct {
server *http.Server
}
// 验证Pod的Webhook处理器
func (ws *WebhookServer) validatePod(w http.ResponseWriter, r *http.Request) {
var body []byte
if r.Body != nil {
if data, err := io.ReadAll(r.Body); err == nil {
body = data
}
}
// 验证content-type
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
klog.Errorf("contentType=%s, expect application/json", contentType)
http.Error(w, "invalid Content-Type, want `application/json`", http.StatusUnsupportedMediaType)
return
}
var admissionResponse *admissionv1.AdmissionResponse
ar := admissionv1.AdmissionReview{}
if _, _, err := codecs.UniversalDeserializer().Decode(body, nil, &ar); err != nil {
klog.Errorf("Could not decode body: %v", err)
admissionResponse = &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
} else {
admissionResponse = ws.validatePodAdmission(ar.Request)
}
admissionReview := admissionv1.AdmissionReview{}
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if ar.Request != nil {
admissionReview.Response.UID = ar.Request.UID
}
}
respBytes, _ := json.Marshal(admissionReview)
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil {
klog.Errorf("Could not write response: %v", err)
}
}
// 验证Pod准入逻辑
func (ws *WebhookServer) validatePodAdmission(req *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse {
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
klog.Errorf("Could not unmarshal raw object: %v", err)
return &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
allowed := true
var result *metav1.Status
message := ""
// 验证规则1: 检查镜像来源
for _, container := range pod.Spec.Containers {
if !ws.isAllowedRegistry(container.Image) {
allowed = false
message = fmt.Sprintf("Image %s is not from allowed registry", container.Image)
break
}
}
// 验证规则2: 检查安全上下文
if allowed {
if pod.Spec.SecurityContext != nil {
if pod.Spec.SecurityContext.RunAsRoot != nil && *pod.Spec.SecurityContext.RunAsRoot {
allowed = false
message = "Running as root is not allowed"
}
}
// 检查容器安全上下文
for _, container := range pod.Spec.Containers {
if container.SecurityContext != nil {
if container.SecurityContext.Privileged != nil && *container.SecurityContext.Privileged {
allowed = false
message = "Privileged containers are not allowed"
break
}
if container.SecurityContext.RunAsRoot != nil && *container.SecurityContext.RunAsRoot {
allowed = false
message = "Running as root is not allowed"
break
}
}
}
}
// 验证规则3: 检查资源限制
if allowed {
for _, container := range pod.Spec.Containers {
if container.Resources.Limits == nil {
allowed = false
message = "Resource limits must be specified"
break
}
if container.Resources.Requests == nil {
allowed = false
message = "Resource requests must be specified"
break
}
}
}
// 验证规则4: 检查标签
if allowed {
if pod.Labels == nil {
allowed = false
message = "Pod must have labels"
} else {
requiredLabels := []string{"app", "version", "environment"}
for _, label := range requiredLabels {
if _, exists := pod.Labels[label]; !exists {
allowed = false
message = fmt.Sprintf("Required label '%s' is missing", label)
break
}
}
}
}
if !allowed {
result = &metav1.Status{
Message: message,
}
}
return &admissionv1.AdmissionResponse{
Allowed: allowed,
Result: result,
}
}
// 检查镜像注册表是否被允许
func (ws *WebhookServer) isAllowedRegistry(image string) bool {
allowedRegistries := []string{
"docker.io",
"gcr.io",
"quay.io",
"registry.k8s.io",
"your-private-registry.com",
}
for _, registry := range allowedRegistries {
if strings.HasPrefix(image, registry) {
return true
}
}
// 如果没有指定注册表,默认为docker.io
if !strings.Contains(image, "/") || !strings.Contains(strings.Split(image, "/")[0], ".") {
return true
}
return false
}
func main() {
certPath := "/etc/certs/tls.crt"
keyPath := "/etc/certs/tls.key"
mux := http.NewServeMux()
webhookServer := &WebhookServer{
server: &http.Server{
Addr: ":8443",
Handler: mux,
},
}
mux.HandleFunc("/validate", webhookServer.validatePod)
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
klog.Info("Starting webhook server...")
if err := webhookServer.server.ListenAndServeTLS(certPath, keyPath); err != nil {
klog.Fatalf("Failed to start webhook server: %v", err)
}
}
12.4.2 变更准入控制器
// 变更Webhook处理器
func (ws *WebhookServer) mutatePod(w http.ResponseWriter, r *http.Request) {
var body []byte
if r.Body != nil {
if data, err := io.ReadAll(r.Body); err == nil {
body = data
}
}
var admissionResponse *admissionv1.AdmissionResponse
ar := admissionv1.AdmissionReview{}
if _, _, err := codecs.UniversalDeserializer().Decode(body, nil, &ar); err != nil {
klog.Errorf("Could not decode body: %v", err)
admissionResponse = &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
} else {
admissionResponse = ws.mutatePodAdmission(ar.Request)
}
admissionReview := admissionv1.AdmissionReview{}
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if ar.Request != nil {
admissionReview.Response.UID = ar.Request.UID
}
}
respBytes, _ := json.Marshal(admissionReview)
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil {
klog.Errorf("Could not write response: %v", err)
}
}
// 变更Pod准入逻辑
func (ws *WebhookServer) mutatePodAdmission(req *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse {
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
klog.Errorf("Could not unmarshal raw object: %v", err)
return &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
var patches []map[string]interface{}
// 变更1: 添加默认标签
if pod.Labels == nil {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/metadata/labels",
"value": map[string]string{},
})
}
// 添加默认标签
defaultLabels := map[string]string{
"managed-by": "admission-webhook",
"version": "v1.0.0",
}
for key, value := range defaultLabels {
if pod.Labels == nil || pod.Labels[key] == "" {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": fmt.Sprintf("/metadata/labels/%s", strings.ReplaceAll(key, "/", "~1")),
"value": value,
})
}
}
// 变更2: 添加默认注解
if pod.Annotations == nil {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/metadata/annotations",
"value": map[string]string{},
})
}
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/metadata/annotations/mutated-by",
"value": "admission-webhook",
})
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/metadata/annotations/mutation-timestamp",
"value": time.Now().Format(time.RFC3339),
})
// 变更3: 添加sidecar容器
if ws.shouldInjectSidecar(&pod) {
sidecarContainer := corev1.Container{
Name: "logging-sidecar",
Image: "fluent/fluent-bit:1.9",
VolumeMounts: []corev1.VolumeMount{
{
Name: "varlog",
MountPath: "/var/log",
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("128Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("200m"),
corev1.ResourceMemory: resource.MustParse("256Mi"),
},
},
}
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/spec/containers/-",
"value": sidecarContainer,
})
// 添加共享卷
if len(pod.Spec.Volumes) == 0 {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/spec/volumes",
"value": []corev1.Volume{},
})
}
logVolume := corev1.Volume{
Name: "varlog",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/spec/volumes/-",
"value": logVolume,
})
}
// 变更4: 设置默认安全上下文
for i, container := range pod.Spec.Containers {
if container.SecurityContext == nil {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": fmt.Sprintf("/spec/containers/%d/securityContext", i),
"value": &corev1.SecurityContext{
RunAsNonRoot: &[]bool{true}[0],
RunAsUser: &[]int64{1000}[0],
AllowPrivilegeEscalation: &[]bool{false}[0],
ReadOnlyRootFilesystem: &[]bool{true}[0],
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{"ALL"},
},
},
})
}
}
patchBytes, _ := json.Marshal(patches)
return &admissionv1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: func() *admissionv1.PatchType {
pt := admissionv1.PatchTypeJSONPatch
return &pt
}(),
}
}
// 判断是否应该注入sidecar
func (ws *WebhookServer) shouldInjectSidecar(pod *corev1.Pod) bool {
// 检查注解
if pod.Annotations != nil {
if inject, exists := pod.Annotations["sidecar.example.com/inject"]; exists {
return inject == "true"
}
}
// 检查命名空间标签
// 这里简化处理,实际应该查询命名空间
if strings.HasPrefix(pod.Namespace, "prod-") {
return true
}
return false
}
12.4.3 Webhook部署配置
# Webhook部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: admission-webhook
namespace: webhook-system
spec:
replicas: 2
selector:
matchLabels:
app: admission-webhook
template:
metadata:
labels:
app: admission-webhook
spec:
serviceAccountName: admission-webhook
containers:
- name: webhook
image: admission-webhook:latest
ports:
- containerPort: 8443
name: webhook-api
volumeMounts:
- name: webhook-tls-certs
mountPath: /etc/certs
readOnly: true
env:
- name: TLS_CERT_FILE
value: /etc/certs/tls.crt
- name: TLS_PRIVATE_KEY_FILE
value: /etc/certs/tls.key
livenessProbe:
httpGet:
path: /health
port: 8443
scheme: HTTPS
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8443
scheme: HTTPS
initialDelaySeconds: 5
periodSeconds: 5
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 250m
memory: 64Mi
volumes:
- name: webhook-tls-certs
secret:
secretName: webhook-tls-certs
---
apiVersion: v1
kind: Service
metadata:
name: admission-webhook
namespace: webhook-system
spec:
selector:
app: admission-webhook
ports:
- name: webhook-api
port: 443
targetPort: 8443
protocol: TCP
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: admission-webhook
namespace: webhook-system
12.5 调度器扩展
12.5.1 调度框架插件
// 自定义调度器插件
package main
import (
"context"
"fmt"
"math"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// NodeResourceFit 插件名称
const NodeResourceFit = "NodeResourceFit"
// NodeResourceFitArgs 插件参数
type NodeResourceFitArgs struct {
// CPU权重
CPUWeight int64 `json:"cpuWeight,omitempty"`
// 内存权重
MemoryWeight int64 `json:"memoryWeight,omitempty"`
// 存储权重
StorageWeight int64 `json:"storageWeight,omitempty"`
}
// NodeResourceFitPlugin 自定义调度插件
type NodeResourceFitPlugin struct {
args *NodeResourceFitArgs
handle framework.Handle
}
// Name 返回插件名称
func (pl *NodeResourceFitPlugin) Name() string {
return NodeResourceFit
}
// Filter 过滤阶段:检查节点是否有足够资源
func (pl *NodeResourceFitPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
// 计算Pod资源需求
podRequest := pl.calculatePodResourceRequest(pod)
// 计算节点可用资源
nodeAllocatable := pl.calculateNodeAllocatable(nodeInfo)
// 检查CPU
if podRequest.MilliCPU > nodeAllocatable.MilliCPU {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("Insufficient CPU: requested %dm, available %dm",
podRequest.MilliCPU, nodeAllocatable.MilliCPU))
}
// 检查内存
if podRequest.Memory > nodeAllocatable.Memory {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("Insufficient memory: requested %d, available %d",
podRequest.Memory, nodeAllocatable.Memory))
}
// 检查存储
if podRequest.EphemeralStorage > nodeAllocatable.EphemeralStorage {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("Insufficient storage: requested %d, available %d",
podRequest.EphemeralStorage, nodeAllocatable.EphemeralStorage))
}
return nil
}
// Score 评分阶段:根据资源使用率给节点评分
func (pl *NodeResourceFitPlugin) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
// 计算Pod资源需求
podRequest := pl.calculatePodResourceRequest(pod)
// 计算节点可用资源
nodeAllocatable := pl.calculateNodeAllocatable(nodeInfo)
// 计算资源利用率评分
cpuScore := pl.calculateResourceScore(podRequest.MilliCPU, nodeAllocatable.MilliCPU)
memoryScore := pl.calculateResourceScore(podRequest.Memory, nodeAllocatable.Memory)
storageScore := pl.calculateResourceScore(podRequest.EphemeralStorage, nodeAllocatable.EphemeralStorage)
// 加权平均
totalWeight := pl.args.CPUWeight + pl.args.MemoryWeight + pl.args.StorageWeight
if totalWeight == 0 {
totalWeight = 3 // 默认权重
pl.args.CPUWeight = 1
pl.args.MemoryWeight = 1
pl.args.StorageWeight = 1
}
score := (cpuScore*pl.args.CPUWeight + memoryScore*pl.args.MemoryWeight + storageScore*pl.args.StorageWeight) / totalWeight
klog.V(4).Infof("Node %s score: CPU=%d, Memory=%d, Storage=%d, Final=%d",
nodeName, cpuScore, memoryScore, storageScore, score)
return score, nil
}
// PreFilter 预过滤阶段:预处理Pod信息
func (pl *NodeResourceFitPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
// 在状态中存储Pod资源需求,避免重复计算
podRequest := pl.calculatePodResourceRequest(pod)
state.Write("podResourceRequest", podRequest)
return nil
}
// PreFilterExtensions 返回预过滤扩展
func (pl *NodeResourceFitPlugin) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
// 资源需求结构
type ResourceRequest struct {
MilliCPU int64
Memory int64
EphemeralStorage int64
}
// 计算Pod资源需求
func (pl *NodeResourceFitPlugin) calculatePodResourceRequest(pod *v1.Pod) *ResourceRequest {
request := &ResourceRequest{}
for _, container := range pod.Spec.Containers {
if container.Resources.Requests != nil {
if cpu := container.Resources.Requests[v1.ResourceCPU]; !cpu.IsZero() {
request.MilliCPU += cpu.MilliValue()
}
if memory := container.Resources.Requests[v1.ResourceMemory]; !memory.IsZero() {
request.Memory += memory.Value()
}
if storage := container.Resources.Requests[v1.ResourceEphemeralStorage]; !storage.IsZero() {
request.EphemeralStorage += storage.Value()
}
}
}
return request
}
// 计算节点可用资源
func (pl *NodeResourceFitPlugin) calculateNodeAllocatable(nodeInfo *framework.NodeInfo) *ResourceRequest {
allocatable := &ResourceRequest{}
node := nodeInfo.Node()
if node.Status.Allocatable != nil {
if cpu := node.Status.Allocatable[v1.ResourceCPU]; !cpu.IsZero() {
allocatable.MilliCPU = cpu.MilliValue()
}
if memory := node.Status.Allocatable[v1.ResourceMemory]; !memory.IsZero() {
allocatable.Memory = memory.Value()
}
if storage := node.Status.Allocatable[v1.ResourceEphemeralStorage]; !storage.IsZero() {
allocatable.EphemeralStorage = storage.Value()
}
}
// 减去已分配的资源
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
for _, container := range pod.Spec.Containers {
if container.Resources.Requests != nil {
if cpu := container.Resources.Requests[v1.ResourceCPU]; !cpu.IsZero() {
allocatable.MilliCPU -= cpu.MilliValue()
}
if memory := container.Resources.Requests[v1.ResourceMemory]; !memory.IsZero() {
allocatable.Memory -= memory.Value()
}
if storage := container.Resources.Requests[v1.ResourceEphemeralStorage]; !storage.IsZero() {
allocatable.EphemeralStorage -= storage.Value()
}
}
}
}
return allocatable
}
// 计算资源评分(0-100)
func (pl *NodeResourceFitPlugin) calculateResourceScore(requested, available int64) int64 {
if available <= 0 {
return 0
}
if requested <= 0 {
return 100
}
// 计算剩余资源比例
remaining := available - requested
if remaining < 0 {
return 0
}
// 评分:剩余资源越多,评分越高
score := int64(math.Round(float64(remaining) / float64(available) * 100))
if score > 100 {
score = 100
}
return score
}
// New 创建插件实例
func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) {
nodeResourceFitArgs := &NodeResourceFitArgs{
CPUWeight: 1,
MemoryWeight: 1,
StorageWeight: 1,
}
if args != nil {
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(
args.(*runtime.Unknown).Raw, nodeResourceFitArgs); err != nil {
return nil, fmt.Errorf("want args to be of type NodeResourceFitArgs, got %T", args)
}
}
return &NodeResourceFitPlugin{
args: nodeResourceFitArgs,
handle: handle,
}, nil
}
// 确保插件实现了所需接口
var _ framework.FilterPlugin = &NodeResourceFitPlugin{}
var _ framework.ScorePlugin = &NodeResourceFitPlugin{}
var _ framework.PreFilterPlugin = &NodeResourceFitPlugin{}
12.5.2 调度器配置
# 自定义调度器配置
apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: custom-scheduler
plugins:
# 预过滤阶段
preFilter:
enabled:
- name: NodeResourceFit
- name: NodeAffinity
- name: PodTopologySpread
disabled:
- name: NodeResourcesFit # 禁用默认插件
# 过滤阶段
filter:
enabled:
- name: NodeResourceFit
- name: NodeAffinity
- name: PodTopologySpread
- name: TaintToleration
disabled:
- name: NodeResourcesFit
# 评分阶段
score:
enabled:
- name: NodeResourceFit
weight: 50
- name: NodeAffinity
weight: 30
- name: PodTopologySpread
weight: 20
disabled:
- name: NodeResourcesFit
# 插件配置
pluginConfig:
- name: NodeResourceFit
args:
cpuWeight: 3
memoryWeight: 2
storageWeight: 1
- name: PodTopologySpread
args:
defaultConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: DoNotSchedule
- maxSkew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: ScheduleAnyway
---
# 调度器部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: custom-scheduler
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: custom-scheduler
template:
metadata:
labels:
app: custom-scheduler
spec:
serviceAccountName: custom-scheduler
containers:
- name: kube-scheduler
image: k8s.gcr.io/kube-scheduler:v1.28.0
command:
- kube-scheduler
- --config=/etc/kubernetes/scheduler-config.yaml
- --v=2
volumeMounts:
- name: config
mountPath: /etc/kubernetes
- name: plugin
mountPath: /opt/plugins
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
volumes:
- name: config
configMap:
name: scheduler-config
- name: plugin
hostPath:
path: /opt/scheduler-plugins
---
# 调度器ServiceAccount
apiVersion: v1
kind: ServiceAccount
metadata:
name: custom-scheduler
namespace: kube-system
---
# 调度器ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: custom-scheduler
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: system:kube-scheduler
subjects:
- kind: ServiceAccount
name: custom-scheduler
namespace: kube-system
12.5.3 使用自定义调度器
# 使用自定义调度器的Pod
apiVersion: v1
kind: Pod
metadata:
name: test-custom-scheduler
spec:
schedulerName: custom-scheduler # 指定自定义调度器
containers:
- name: app
image: nginx:1.21
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 200m
memory: 256Mi
---
# 使用自定义调度器的Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: app-with-custom-scheduler
spec:
replicas: 3
selector:
matchLabels:
app: custom-scheduled-app
template:
metadata:
labels:
app: custom-scheduled-app
spec:
schedulerName: custom-scheduler
containers:
- name: app
image: nginx:1.21
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
12.6 网络插件开发
12.6.1 CNI插件基础
// 简单的CNI插件示例
package main
import (
"encoding/json"
"fmt"
"net"
"os"
"runtime"
"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/version"
"github.com/containernetworking/plugins/pkg/ip"
"github.com/containernetworking/plugins/pkg/ipam"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/vishvananda/netlink"
)
// NetConf CNI网络配置
type NetConf struct {
types.NetConf
Bridge string `json:"bridge"`
IsGW bool `json:"isGateway"`
IsDefaultGW bool `json:"isDefaultGateway"`
ForceAddress bool `json:"forceAddress"`
IPMasq bool `json:"ipMasq"`
MTU int `json:"mtu"`
HairpinMode bool `json:"hairpinMode"`
}
// cmdAdd 添加网络接口
func cmdAdd(args *skel.CmdArgs) error {
n, cniVersion, err := loadNetConf(args.StdinData)
if err != nil {
return err
}
// 创建或获取网桥
br, err := setupBridge(n)
if err != nil {
return err
}
netns, err := ns.GetNS(args.Netns)
if err != nil {
return fmt.Errorf("failed to open netns %q: %v", args.Netns, err)
}
defer netns.Close()
// 分配IP地址
r, err := ipam.ExecAdd(n.IPAM.Type, args.StdinData)
if err != nil {
return err
}
// 创建veth pair
hostInterface, containerInterface, err := setupVeth(netns, br, args.IfName, n.MTU, n.HairpinMode)
if err != nil {
return err
}
// 在容器命名空间中配置接口
err = netns.Do(func(hostNS ns.NetNS) error {
return configureInterface(args.IfName, r.IPs, r.Routes)
})
if err != nil {
return err
}
// 配置网桥
if n.IsGW {
err = ensureBridgeAddr(br, r.IPs, n.ForceAddress)
if err != nil {
return err
}
}
// 配置IP转发和masquerade
if n.IPMasq {
err = setupIPMasq(r.IPs)
if err != nil {
return err
}
}
result := &types.Result{
CNIVersion: cniVersion,
IPs: r.IPs,
Routes: r.Routes,
DNS: r.DNS,
Interfaces: []*types.Interface{
hostInterface,
containerInterface,
},
}
return types.PrintResult(result, cniVersion)
}
// cmdDel 删除网络接口
func cmdDel(args *skel.CmdArgs) error {
n, _, err := loadNetConf(args.StdinData)
if err != nil {
return err
}
// 释放IP地址
err = ipam.ExecDel(n.IPAM.Type, args.StdinData)
if err != nil {
return err
}
// 删除veth接口
if args.Netns == "" {
return nil
}
err = ns.WithNetNSPath(args.Netns, func(hostNS ns.NetNS) error {
return ip.DelLinkByName(args.IfName)
})
if err != nil && err != ip.ErrLinkNotFound {
return err
}
return nil
}
// cmdCheck 检查网络接口
func cmdCheck(args *skel.CmdArgs) error {
// 实现网络接口检查逻辑
return nil
}
// loadNetConf 加载网络配置
func loadNetConf(bytes []byte) (*NetConf, string, error) {
n := &NetConf{}
if err := json.Unmarshal(bytes, n); err != nil {
return nil, "", fmt.Errorf("failed to load netconf: %v", err)
}
return n, n.CNIVersion, nil
}
// setupBridge 设置网桥
func setupBridge(n *NetConf) (*netlink.Bridge, error) {
// 检查网桥是否存在
l, err := netlink.LinkByName(n.Bridge)
if err != nil {
// 创建网桥
br := &netlink.Bridge{
LinkAttrs: netlink.LinkAttrs{
Name: n.Bridge,
MTU: n.MTU,
},
}
if err := netlink.LinkAdd(br); err != nil {
return nil, fmt.Errorf("could not add %q: %v", n.Bridge, err)
}
l = br
}
br, ok := l.(*netlink.Bridge)
if !ok {
return nil, fmt.Errorf("%q already exists but is not a bridge", n.Bridge)
}
// 启动网桥
if err := netlink.LinkSetUp(br); err != nil {
return nil, err
}
return br, nil
}
// setupVeth 设置veth pair
func setupVeth(netns ns.NetNS, br *netlink.Bridge, ifName string, mtu int, hairpinMode bool) (*types.Interface, *types.Interface, error) {
hostIface := &types.Interface{}
containerIface := &types.Interface{}
err := netns.Do(func(hostNS ns.NetNS) error {
// 创建veth pair
hostVeth, containerVeth, err := ip.SetupVeth(ifName, mtu, hostNS)
if err != nil {
return err
}
hostIface.Name = hostVeth.Name
hostIface.Mac = hostVeth.HardwareAddr.String()
containerIface.Name = containerVeth.Name
containerIface.Mac = containerVeth.HardwareAddr.String()
containerIface.Sandbox = netns.Path()
// 将host端veth添加到网桥
if err := netlink.LinkSetMaster(hostVeth, br); err != nil {
return fmt.Errorf("failed to connect %q to bridge %v: %v", hostVeth.Name, br.Attrs().Name, err)
}
// 设置hairpin模式
if hairpinMode {
if err = netlink.LinkSetHairpin(hostVeth, true); err != nil {
return fmt.Errorf("failed to setup hairpin mode for %v: %v", hostVeth.Name, err)
}
}
return nil
})
return hostIface, containerIface, err
}
// configureInterface 配置容器接口
func configureInterface(ifName string, ips []*types.IPConfig, routes []*types.Route) error {
link, err := netlink.LinkByName(ifName)
if err != nil {
return fmt.Errorf("failed to lookup %q: %v", ifName, err)
}
// 配置IP地址
for _, ipc := range ips {
addr := &netlink.Addr{IPNet: &ipc.Address, Label: ""}
if err = netlink.AddrAdd(link, addr); err != nil {
return fmt.Errorf("failed to add IP addr %v to %q: %v", ipc.Address, ifName, err)
}
}
// 启动接口
if err = netlink.LinkSetUp(link); err != nil {
return fmt.Errorf("failed to set %q UP: %v", ifName, err)
}
// 配置路由
for _, r := range routes {
route := &netlink.Route{
LinkIndex: link.Attrs().Index,
Scope: netlink.SCOPE_UNIVERSE,
Dst: &r.Dst,
Gw: r.GW,
}
if err = netlink.RouteAdd(route); err != nil {
return fmt.Errorf("failed to add route %v: %v", r, err)
}
}
return nil
}
// ensureBridgeAddr 确保网桥有IP地址
func ensureBridgeAddr(br *netlink.Bridge, ips []*types.IPConfig, forceAddress bool) error {
addrs, err := netlink.AddrList(br, netlink.FAMILY_V4)
if err != nil && err != netlink.ErrNotImplemented {
return fmt.Errorf("could not get list of IP addresses: %v", err)
}
// 如果网桥已有地址且不强制设置,则跳过
if len(addrs) > 0 && !forceAddress {
return nil
}
// 为网桥设置网关地址
for _, ipc := range ips {
gatewayAddr := &net.IPNet{
IP: ipc.Gateway,
Mask: ipc.Address.Mask,
}
addr := &netlink.Addr{IPNet: gatewayAddr, Label: ""}
if err = netlink.AddrAdd(br, addr); err != nil {
return fmt.Errorf("failed to add IP addr %v to %q: %v", gatewayAddr, br.Attrs().Name, err)
}
}
return nil
}
// setupIPMasq 设置IP masquerade
func setupIPMasq(ips []*types.IPConfig) error {
// 启用IP转发
if err := ip.EnableIP4Forward(); err != nil {
return fmt.Errorf("failed to enable forwarding: %v", err)
}
// 这里应该设置iptables规则,简化处理
// 实际实现需要使用iptables库
return nil
}
func main() {
skel.PluginMain(cmdAdd, cmdCheck, cmdDel, version.All, "Simple CNI plugin v0.1.0")
}
func init() {
// 锁定OS线程
runtime.LockOSThread()
}
12.6.2 CNI插件配置
{
"cniVersion": "0.4.0",
"name": "simple-bridge",
"type": "simple-bridge",
"bridge": "cni0",
"isGateway": true,
"isDefaultGateway": true,
"forceAddress": false,
"ipMasq": true,
"mtu": 1500,
"hairpinMode": true,
"ipam": {
"type": "host-local",
"subnet": "10.244.0.0/16",
"routes": [
{
"dst": "0.0.0.0/0"
}
]
}
}
# CNI插件DaemonSet部署
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: simple-cni
namespace: kube-system
spec:
selector:
matchLabels:
app: simple-cni
template:
metadata:
labels:
app: simple-cni
spec:
hostNetwork: true
tolerations:
- operator: Exists
effect: NoSchedule
serviceAccountName: simple-cni
containers:
- name: install-cni
image: simple-cni:latest
command: ["/install-cni.sh"]
env:
- name: CNI_NETWORK_CONFIG
valueFrom:
configMapKeyRef:
name: simple-cni-config
key: cni_network_config
volumeMounts:
- name: cni-bin-dir
mountPath: /host/opt/cni/bin
- name: cni-net-dir
mountPath: /host/etc/cni/net.d
securityContext:
privileged: true
volumes:
- name: cni-bin-dir
hostPath:
path: /opt/cni/bin
- name: cni-net-dir
hostPath:
path: /etc/cni/net.d
---
apiVersion: v1
kind: ConfigMap
metadata:
name: simple-cni-config
namespace: kube-system
data:
cni_network_config: |
{
"cniVersion": "0.4.0",
"name": "simple-bridge",
"type": "simple-bridge",
"bridge": "cni0",
"isGateway": true,
"isDefaultGateway": true,
"forceAddress": false,
"ipMasq": true,
"mtu": 1500,
"hairpinMode": true,
"ipam": {
"type": "host-local",
"subnet": "10.244.0.0/16",
"routes": [
{
"dst": "0.0.0.0/0"
}
]
}
}
12.7 存储插件开发
12.7.1 CSI驱动开发
// CSI驱动示例
package main
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
)
// Driver CSI驱动结构
type Driver struct {
name string
version string
nodeID string
// 服务能力
vcap map[csi.VolumeCapability_AccessMode_Mode]bool
cscap []*csi.ControllerServiceCapability
nscap []*csi.NodeServiceCapability
}
// NewDriver 创建新的CSI驱动
func NewDriver(name, version, nodeID string) *Driver {
d := &Driver{
name: name,
version: version,
nodeID: nodeID,
vcap: map[csi.VolumeCapability_AccessMode_Mode]bool{},
}
// 设置支持的访问模式
d.vcap[csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER] = true
d.vcap[csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY] = true
d.vcap[csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY] = true
// 设置控制器服务能力
d.cscap = []*csi.ControllerServiceCapability{
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
},
},
},
}
// 设置节点服务能力
d.nscap = []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
}
return d
}
// Identity Service 实现
// GetPluginInfo 获取插件信息
func (d *Driver) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
return &csi.GetPluginInfoResponse{
Name: d.name,
VendorVersion: d.version,
}, nil
}
// GetPluginCapabilities 获取插件能力
func (d *Driver) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}, nil
}
// Probe 健康检查
func (d *Driver) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{}, nil
}
// Controller Service 实现
// CreateVolume 创建卷
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if len(req.GetName()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume name missing in request")
}
if req.GetVolumeCapabilities() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
}
// 验证卷能力
for _, cap := range req.GetVolumeCapabilities() {
if cap.GetAccessMode() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume access mode missing")
}
if !d.vcap[cap.GetAccessMode().GetMode()] {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("Unsupported access mode: %v", cap.GetAccessMode().GetMode()))
}
}
// 获取卷大小
size := req.GetCapacityRange().GetRequiredBytes()
if size == 0 {
size = 1 * 1024 * 1024 * 1024 // 默认1GB
}
// 创建卷(这里简化为创建目录)
volumePath := filepath.Join("/var/lib/csi-volumes", req.GetName())
if err := os.MkdirAll(volumePath, 0755); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to create volume directory: %v", err))
}
klog.Infof("Created volume %s at %s with size %d", req.GetName(), volumePath, size)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: req.GetName(),
CapacityBytes: size,
VolumeContext: req.GetParameters(),
},
}, nil
}
// DeleteVolume 删除卷
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
volumePath := filepath.Join("/var/lib/csi-volumes", req.GetVolumeId())
if err := os.RemoveAll(volumePath); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to delete volume: %v", err))
}
klog.Infof("Deleted volume %s", req.GetVolumeId())
return &csi.DeleteVolumeResponse{}, nil
}
// ControllerPublishVolume 发布卷到节点
func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetNodeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Node ID missing in request")
}
// 这里可以实现卷的发布逻辑,比如挂载到特定节点
klog.Infof("Published volume %s to node %s", req.GetVolumeId(), req.GetNodeId())
return &csi.ControllerPublishVolumeResponse{}, nil
}
// ControllerUnpublishVolume 从节点取消发布卷
func (d *Driver) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
klog.Infof("Unpublished volume %s from node %s", req.GetVolumeId(), req.GetNodeId())
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
// ValidateVolumeCapabilities 验证卷能力
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetVolumeCapabilities() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
}
// 验证每个能力
for _, cap := range req.GetVolumeCapabilities() {
if cap.GetAccessMode() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume access mode missing")
}
if !d.vcap[cap.GetAccessMode().GetMode()] {
return &csi.ValidateVolumeCapabilitiesResponse{
Message: fmt.Sprintf("Unsupported access mode: %v", cap.GetAccessMode().GetMode()),
}, nil
}
}
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeCapabilities: req.GetVolumeCapabilities(),
},
}, nil
}
// ListVolumes 列出卷
func (d *Driver) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "ListVolumes not implemented")
}
// GetCapacity 获取容量
func (d *Driver) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "GetCapacity not implemented")
}
// ControllerGetCapabilities 获取控制器能力
func (d *Driver) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: d.cscap,
}, nil
}
// CreateSnapshot 创建快照
func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "CreateSnapshot not implemented")
}
// DeleteSnapshot 删除快照
func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "DeleteSnapshot not implemented")
}
// ListSnapshots 列出快照
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "ListSnapshots not implemented")
}
// ControllerExpandVolume 扩展卷
func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "ControllerExpandVolume not implemented")
}
// Node Service 实现
// NodeStageVolume 暂存卷
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetStagingTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
}
// 创建暂存目录
if err := os.MkdirAll(req.GetStagingTargetPath(), 0755); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to create staging directory: %v", err))
}
klog.Infof("Staged volume %s at %s", req.GetVolumeId(), req.GetStagingTargetPath())
return &csi.NodeStageVolumeResponse{}, nil
}
// NodeUnstageVolume 取消暂存卷
func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetStagingTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
}
klog.Infof("Unstaged volume %s from %s", req.GetVolumeId(), req.GetStagingTargetPath())
return &csi.NodeUnstageVolumeResponse{}, nil
}
// NodePublishVolume 发布卷到Pod
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
// 创建目标目录
if err := os.MkdirAll(req.GetTargetPath(), 0755); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("Failed to create target directory: %v", err))
}
// 绑定挂载(这里简化处理)
volumePath := filepath.Join("/var/lib/csi-volumes", req.GetVolumeId())
klog.Infof("Published volume %s to %s", req.GetVolumeId(), req.GetTargetPath())
return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume 从Pod取消发布卷
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
klog.Infof("Unpublished volume %s from %s", req.GetVolumeId(), req.GetTargetPath())
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// NodeGetVolumeStats 获取卷统计信息
func (d *Driver) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "NodeGetVolumeStats not implemented")
}
// NodeExpandVolume 扩展节点卷
func (d *Driver) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "NodeExpandVolume not implemented")
}
// NodeGetCapabilities 获取节点能力
func (d *Driver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: d.nscap,
}, nil
}
// NodeGetInfo 获取节点信息
func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return &csi.NodeGetInfoResponse{
NodeId: d.nodeID,
}, nil
}
12.7.2 CSI驱动部署
# CSI驱动部署配置
apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
name: simple-csi
spec:
attachRequired: false
podInfoOnMount: true
volumeLifecycleModes:
- Persistent
- Ephemeral
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: simple-csi-node
namespace: kube-system
spec:
selector:
matchLabels:
app: simple-csi-node
template:
metadata:
labels:
app: simple-csi-node
spec:
serviceAccount: simple-csi-node-sa
hostNetwork: true
containers:
- name: node-driver-registrar
image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.5.0
args:
- --v=2
- --csi-address=/csi/csi.sock
- --kubelet-registration-path=/var/lib/kubelet/plugins/simple-csi/csi.sock
env:
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: plugin-dir
mountPath: /csi/
- name: registration-dir
mountPath: /registration/
- name: simple-csi
image: simple-csi:latest
args:
- --endpoint=$(CSI_ENDPOINT)
- --nodeid=$(NODE_ID)
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: NODE_ID
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
privileged: true
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
volumeMounts:
- name: plugin-dir
mountPath: /csi
- name: pods-mount-dir
mountPath: /var/lib/kubelet
mountPropagation: "Bidirectional"
- name: device-dir
mountPath: /dev
volumes:
- name: registration-dir
hostPath:
path: /var/lib/kubelet/plugins_registry/
type: DirectoryOrCreate
- name: plugin-dir
hostPath:
path: /var/lib/kubelet/plugins/simple-csi/
type: DirectoryOrCreate
- name: pods-mount-dir
hostPath:
path: /var/lib/kubelet
type: Directory
- name: device-dir
hostPath:
path: /dev
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: simple-csi-controller
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: simple-csi-controller
template:
metadata:
labels:
app: simple-csi-controller
spec:
serviceAccount: simple-csi-controller-sa
containers:
- name: csi-provisioner
image: k8s.gcr.io/sig-storage/csi-provisioner:v3.2.0
args:
- --csi-address=$(ADDRESS)
- --v=2
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
volumeMounts:
- name: socket-dir
mountPath: /var/lib/csi/sockets/pluginproxy/
- name: csi-attacher
image: k8s.gcr.io/sig-storage/csi-attacher:v3.5.0
args:
- --v=2
- --csi-address=$(ADDRESS)
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
volumeMounts:
- name: socket-dir
mountPath: /var/lib/csi/sockets/pluginproxy/
- name: simple-csi
image: simple-csi:latest
args:
- --endpoint=$(CSI_ENDPOINT)
env:
- name: CSI_ENDPOINT
value: unix:///var/lib/csi/sockets/pluginproxy/csi.sock
volumeMounts:
- name: socket-dir
mountPath: /var/lib/csi/sockets/pluginproxy/
volumes:
- name: socket-dir
emptyDir: {}
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: simple-csi
provisioner: simple-csi
parameters:
type: "local"
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
12.8 设备插件开发
12.8.1 设备插件基础
// 设备插件示例
package main
import (
"context"
"fmt"
"net"
"os"
"path"
"time"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"k8s.io/klog/v2"
)
const (
resourceName = "example.com/gpu"
serverSock = pluginapi.DevicePluginPath + "gpu.sock"
)
// GPUDevicePlugin GPU设备插件
type GPUDevicePlugin struct {
socket string
server *grpc.Server
devices map[string]*pluginapi.Device
}
// NewGPUDevicePlugin 创建新的GPU设备插件
func NewGPUDevicePlugin() *GPUDevicePlugin {
return &GPUDevicePlugin{
socket: serverSock,
devices: make(map[string]*pluginapi.Device),
}
}
// GetDevicePluginOptions 获取设备插件选项
func (m *GPUDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{}, nil
}
// ListAndWatch 列出并监控设备
func (m *GPUDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
klog.Info("ListAndWatch called")
// 发现设备
m.discoverDevices()
// 发送设备列表
devs := make([]*pluginapi.Device, 0, len(m.devices))
for _, dev := range m.devices {
devs = append(devs, dev)
}
resp := &pluginapi.ListAndWatchResponse{Devices: devs}
if err := s.Send(resp); err != nil {
klog.Errorf("Failed to send response: %v", err)
return err
}
// 持续监控设备变化
for {
time.Sleep(10 * time.Second)
// 这里可以检查设备状态变化
// 如果有变化,重新发送设备列表
}
}
// Allocate 分配设备
func (m *GPUDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
klog.Info("Allocate called")
responses := &pluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
response := &pluginapi.ContainerAllocateResponse{
Envs: map[string]string{
"GPU_DEVICES": fmt.Sprintf("%v", req.DevicesIDs),
},
Mounts: []*pluginapi.Mount{
{
ContainerPath: "/dev/gpu0",
HostPath: "/dev/gpu0",
ReadOnly: false,
},
},
Devices: []*pluginapi.DeviceSpec{
{
ContainerPath: "/dev/gpu0",
HostPath: "/dev/gpu0",
Permissions: "rwm",
},
},
}
// 标记设备为已分配
for _, id := range req.DevicesIDs {
if dev, exists := m.devices[id]; exists {
dev.Health = pluginapi.Unhealthy // 标记为不可用
}
}
responses.ContainerResponses = append(responses.ContainerResponses, response)
}
return responses, nil
}
// GetPreferredAllocation 获取首选分配
func (m *GPUDevicePlugin) GetPreferredAllocation(ctx context.Context, req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{}, nil
}
// PreStartContainer 容器启动前处理
func (m *GPUDevicePlugin) PreStartContainer(ctx context.Context, req *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
return &pluginapi.PreStartContainerResponse{}, nil
}
// discoverDevices 发现设备
func (m *GPUDevicePlugin) discoverDevices() {
// 模拟发现GPU设备
for i := 0; i < 2; i++ {
deviceID := fmt.Sprintf("gpu-%d", i)
m.devices[deviceID] = &pluginapi.Device{
ID: deviceID,
Health: pluginapi.Healthy,
}
}
klog.Infof("Discovered %d GPU devices", len(m.devices))
}
// Start 启动设备插件
func (m *GPUDevicePlugin) Start() error {
err := m.cleanup()
if err != nil {
return err
}
sock, err := net.Listen("unix", m.socket)
if err != nil {
return err
}
m.server = grpc.NewServer()
pluginapi.RegisterDevicePluginServer(m.server, m)
go func() {
err := m.server.Serve(sock)
if err != nil {
klog.Errorf("Failed to serve: %v", err)
}
}()
// 等待服务器启动
conn, err := m.dial(m.socket, 5*time.Second)
if err != nil {
return err
}
conn.Close()
return nil
}
// Stop 停止设备插件
func (m *GPUDevicePlugin) Stop() error {
if m.server == nil {
return nil
}
m.server.Stop()
m.server = nil
return m.cleanup()
}
// Register 注册设备插件
func (m *GPUDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
conn, err := m.dial(kubeletEndpoint, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}
// dial 连接到指定端点
func (m *GPUDevicePlugin) dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error) {
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(timeout),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return nil, err
}
return c, nil
}
// cleanup 清理资源
func (m *GPUDevicePlugin) cleanup() error {
if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func main() {
klog.Info("Starting GPU device plugin")
plugin := NewGPUDevicePlugin()
if err := plugin.Start(); err != nil {
klog.Fatalf("Failed to start device plugin: %v", err)
}
if err := plugin.Register(pluginapi.KubeletSocket, resourceName); err != nil {
klog.Fatalf("Failed to register device plugin: %v", err)
}
klog.Info("GPU device plugin registered")
// 保持运行
select {}
}
12.8.2 设备插件部署
# 设备插件DaemonSet
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: gpu-device-plugin
namespace: kube-system
spec:
selector:
matchLabels:
name: gpu-device-plugin
template:
metadata:
labels:
name: gpu-device-plugin
spec:
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- image: gpu-device-plugin:latest
name: gpu-device-plugin
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
- name: dev
mountPath: /dev
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
- name: dev
hostPath:
path: /dev
nodeSelector:
accelerator: gpu
12.9 准入控制器
12.9.1 Validating Admission Webhook
// 验证准入控制器
package main
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/klog/v2"
)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
// WebhookServer Webhook服务器
type WebhookServer struct {
server *http.Server
}
// WhSvrParameters Webhook服务器参数
type WhSvrParameters struct {
port int // webhook服务器端口
certFile string // TLS证书文件路径
keyFile string // TLS私钥文件路径
}
// 验证Pod的函数
func (whsvr *WebhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
klog.Errorf("Could not unmarshal raw object: %v", err)
return &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v",
req.Kind, req.Namespace, req.Name, req.UID, req.Operation, req.UserInfo)
allowed := true
message := ""
// 验证规则1: 检查是否使用了特权容器
for _, container := range pod.Spec.Containers {
if container.SecurityContext != nil && container.SecurityContext.Privileged != nil && *container.SecurityContext.Privileged {
allowed = false
message = "Privileged containers are not allowed"
break
}
}
// 验证规则2: 检查是否使用了hostNetwork
if allowed && pod.Spec.HostNetwork {
allowed = false
message = "HostNetwork is not allowed"
}
// 验证规则3: 检查镜像来源
if allowed {
for _, container := range pod.Spec.Containers {
if !strings.HasPrefix(container.Image, "registry.company.com/") {
allowed = false
message = fmt.Sprintf("Image %s is not from approved registry", container.Image)
break
}
}
}
// 验证规则4: 检查资源限制
if allowed {
for _, container := range pod.Spec.Containers {
if container.Resources.Limits == nil {
allowed = false
message = "Resource limits must be specified"
break
}
if container.Resources.Limits.Cpu().IsZero() || container.Resources.Limits.Memory().IsZero() {
allowed = false
message = "CPU and memory limits must be specified"
break
}
}
}
return &admissionv1.AdmissionResponse{
UID: req.UID,
Allowed: allowed,
Result: &metav1.Status{
Message: message,
},
}
}
// 处理验证请求
func (whsvr *WebhookServer) serve(w http.ResponseWriter, r *http.Request) {
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}
if len(body) == 0 {
klog.Error("empty body")
http.Error(w, "empty body", http.StatusBadRequest)
return
}
// 验证content-type
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
klog.Errorf("Content-Type=%s, expect application/json", contentType)
http.Error(w, "invalid Content-Type, expect `application/json`", http.StatusUnsupportedMediaType)
return
}
var admissionResponse *admissionv1.AdmissionResponse
ar := admissionv1.AdmissionReview{}
if _, _, err := codecs.UniversalDeserializer().Decode(body, nil, &ar); err != nil {
klog.Errorf("Could not decode body: %v", err)
admissionResponse = &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
} else {
admissionResponse = whsvr.validate(&ar)
}
admissionReview := admissionv1.AdmissionReview{}
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if ar.Request != nil {
admissionReview.Response.UID = ar.Request.UID
}
}
respBytes, _ := json.Marshal(admissionReview)
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil {
klog.Error(err)
}
}
func main() {
var parameters WhSvrParameters
parameters.port = 8443
parameters.certFile = "/etc/webhook/certs/tls.crt"
parameters.keyFile = "/etc/webhook/certs/tls.key"
cert, err := tls.LoadX509KeyPair(parameters.certFile, parameters.keyFile)
if err != nil {
klog.Errorf("Failed to load key pair: %v", err)
}
server := &WebhookServer{
server: &http.Server{
Addr: fmt.Sprintf(":%v", parameters.port),
TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
},
}
mux := http.NewServeMux()
mux.HandleFunc("/validate", server.serve)
server.server.Handler = mux
klog.Info("Starting webhook server...")
if err := server.server.ListenAndServeTLS("", ""); err != nil {
klog.Errorf("Failed to start webhook server: %v", err)
}
}
12.9.2 Mutating Admission Webhook
// 变更准入控制器
package main
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/klog/v2"
)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
// 变更Pod的函数
func (whsvr *WebhookServer) mutate(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
klog.Errorf("Could not unmarshal raw object: %v", err)
return &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v",
req.Kind, req.Namespace, req.Name, req.UID, req.Operation, req.UserInfo)
var patches []map[string]interface{}
// 变更规则1: 添加默认标签
if pod.Labels == nil {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/metadata/labels",
"value": map[string]string{},
})
}
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/metadata/labels/injected-by",
"value": "admission-webhook",
})
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/metadata/labels/injection-timestamp",
"value": metav1.Now().Format("2006-01-02T15:04:05Z"),
})
// 变更规则2: 添加sidecar容器
if req.Namespace != "kube-system" && req.Namespace != "kube-public" {
sidecarContainer := corev1.Container{
Name: "logging-sidecar",
Image: "fluent/fluent-bit:latest",
VolumeMounts: []corev1.VolumeMount{
{
Name: "varlog",
MountPath: "/var/log",
},
},
}
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/spec/containers/-",
"value": sidecarContainer,
})
// 添加对应的卷
logVolume := corev1.Volume{
Name: "varlog",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}
if len(pod.Spec.Volumes) == 0 {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/spec/volumes",
"value": []corev1.Volume{logVolume},
})
} else {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": "/spec/volumes/-",
"value": logVolume,
})
}
}
// 变更规则3: 设置默认资源限制
for i, container := range pod.Spec.Containers {
if container.Resources.Limits == nil {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": fmt.Sprintf("/spec/containers/%d/resources/limits", i),
"value": map[string]string{
"cpu": "500m",
"memory": "512Mi",
},
})
}
if container.Resources.Requests == nil {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": fmt.Sprintf("/spec/containers/%d/resources/requests", i),
"value": map[string]string{
"cpu": "100m",
"memory": "128Mi",
},
})
}
}
// 变更规则4: 添加安全上下文
for i, container := range pod.Spec.Containers {
if container.SecurityContext == nil {
patches = append(patches, map[string]interface{}{
"op": "add",
"path": fmt.Sprintf("/spec/containers/%d/securityContext", i),
"value": map[string]interface{}{
"runAsNonRoot": true,
"runAsUser": 1000,
"allowPrivilegeEscalation": false,
"readOnlyRootFilesystem": true,
"capabilities": map[string]interface{}{
"drop": []string{"ALL"},
},
},
})
}
}
patchBytes, _ := json.Marshal(patches)
return &admissionv1.AdmissionResponse{
UID: req.UID,
Allowed: true,
Patch: patchBytes,
PatchType: func() *admissionv1.PatchType {
pt := admissionv1.PatchTypeJSONPatch
return &pt
}(),
}
}
12.9.3 准入控制器部署
# 准入控制器部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: admission-webhook
namespace: webhook-system
spec:
replicas: 2
selector:
matchLabels:
app: admission-webhook
template:
metadata:
labels:
app: admission-webhook
spec:
serviceAccountName: admission-webhook
containers:
- name: webhook
image: admission-webhook:latest
ports:
- containerPort: 8443
name: webhook-api
volumeMounts:
- name: webhook-tls-certs
mountPath: /etc/webhook/certs
readOnly: true
env:
- name: TLS_CERT_FILE
value: /etc/webhook/certs/tls.crt
- name: TLS_PRIVATE_KEY_FILE
value: /etc/webhook/certs/tls.key
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 128Mi
livenessProbe:
httpGet:
path: /health
port: 8443
scheme: HTTPS
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8443
scheme: HTTPS
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: webhook-tls-certs
secret:
secretName: webhook-tls
---
apiVersion: v1
kind: Service
metadata:
name: admission-webhook-service
namespace: webhook-system
spec:
selector:
app: admission-webhook
ports:
- name: webhook-api
port: 443
targetPort: 8443
protocol: TCP
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingAdmissionWebhook
metadata:
name: pod-policy-webhook
webhooks:
- name: pod-policy.example.com
clientConfig:
service:
name: admission-webhook-service
namespace: webhook-system
path: "/validate"
caBundle: LS0tLS1CRUdJTi... # Base64编码的CA证书
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: [""]
apiVersions: ["v1"]
resources: ["pods"]
admissionReviewVersions: ["v1", "v1beta1"]
sideEffects: None
failurePolicy: Fail
---
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingAdmissionWebhook
metadata:
name: pod-mutating-webhook
webhooks:
- name: pod-mutating.example.com
clientConfig:
service:
name: admission-webhook-service
namespace: webhook-system
path: "/mutate"
caBundle: LS0tLS1CRUdJTi... # Base64编码的CA证书
rules:
- operations: ["CREATE"]
apiGroups: [""]
apiVersions: ["v1"]
resources: ["pods"]
admissionReviewVersions: ["v1", "v1beta1"]
sideEffects: None
failurePolicy: Fail
12.10 扩展机制最佳实践
12.10.1 开发最佳实践
#!/bin/bash
# 扩展开发检查脚本
echo "=== Kubernetes扩展开发最佳实践检查 ==="
# 1. CRD设计检查
check_crd_design() {
echo "\n1. CRD设计检查:"
# 检查CRD版本策略
echo " - 检查CRD版本策略..."
kubectl get crd -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.spec.versions[*].name}{"\n"}{end}' | \
while read crd versions; do
if [[ $(echo $versions | wc -w) -gt 1 ]]; then
echo " ✓ CRD $crd 支持多版本: $versions"
else
echo " ⚠ CRD $crd 只有单版本: $versions"
fi
done
# 检查CRD字段验证
echo " - 检查CRD字段验证..."
kubectl get crd -o json | jq -r '.items[] | select(.spec.versions[].schema.openAPIV3Schema.properties != null) | .metadata.name' | \
while read crd; do
echo " ✓ CRD $crd 包含字段验证"
done
}
# 2. Operator模式检查
check_operator_pattern() {
echo "\n2. Operator模式检查:"
# 检查控制器部署
echo " - 检查控制器部署..."
kubectl get deployment -A -l "app.kubernetes.io/component=controller" -o jsonpath='{range .items[*]}{.metadata.namespace}{"\t"}{.metadata.name}{"\t"}{.spec.replicas}{"\n"}{end}' | \
while read namespace name replicas; do
if [[ $replicas -gt 1 ]]; then
echo " ✓ 控制器 $namespace/$name 配置了高可用 (replicas: $replicas)"
else
echo " ⚠ 控制器 $namespace/$name 未配置高可用 (replicas: $replicas)"
fi
done
# 检查RBAC权限
echo " - 检查RBAC权限..."
kubectl get clusterrole -o json | jq -r '.items[] | select(.rules[]?.verbs[]? == "*") | .metadata.name' | \
while read role; do
echo " ⚠ ClusterRole $role 拥有过于宽泛的权限"
done
}
# 3. 准入控制器检查
check_admission_controllers() {
echo "\n3. 准入控制器检查:"
# 检查Webhook配置
echo " - 检查ValidatingAdmissionWebhook..."
kubectl get validatingadmissionwebhook -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.webhooks[*].failurePolicy}{"\n"}{end}' | \
while read webhook policy; do
if [[ "$policy" == "Fail" ]]; then
echo " ✓ Webhook $webhook 配置了Fail策略"
else
echo " ⚠ Webhook $webhook 配置了Ignore策略,可能影响安全性"
fi
done
echo " - 检查MutatingAdmissionWebhook..."
kubectl get mutatingadmissionwebhook -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.webhooks[*].sideEffects}{"\n"}{end}' | \
while read webhook sideEffects; do
if [[ "$sideEffects" == "None" ]]; then
echo " ✓ Webhook $webhook 声明无副作用"
else
echo " ⚠ Webhook $webhook 可能有副作用: $sideEffects"
fi
done
}
# 4. 性能和可靠性检查
check_performance_reliability() {
echo "\n4. 性能和可靠性检查:"
# 检查资源限制
echo " - 检查扩展组件资源限制..."
kubectl get deployment -A -o json | jq -r '.items[] | select(.metadata.labels["app.kubernetes.io/component"] == "controller") | "\(.metadata.namespace)/\(.metadata.name)\t\(.spec.template.spec.containers[0].resources.limits // "none")"' | \
while read deployment limits; do
if [[ "$limits" == "none" ]]; then
echo " ⚠ 部署 $deployment 未设置资源限制"
else
echo " ✓ 部署 $deployment 设置了资源限制"
fi
done
# 检查健康检查
echo " - 检查健康检查配置..."
kubectl get deployment -A -o json | jq -r '.items[] | select(.metadata.labels["app.kubernetes.io/component"] == "controller") | "\(.metadata.namespace)/\(.metadata.name)\t\(.spec.template.spec.containers[0].livenessProbe != null)\t\(.spec.template.spec.containers[0].readinessProbe != null)"' | \
while read deployment liveness readiness; do
if [[ "$liveness" == "true" && "$readiness" == "true" ]]; then
echo " ✓ 部署 $deployment 配置了完整的健康检查"
else
echo " ⚠ 部署 $deployment 缺少健康检查配置"
fi
done
}
# 5. 安全性检查
check_security() {
echo "\n5. 安全性检查:"
# 检查ServiceAccount
echo " - 检查ServiceAccount使用..."
kubectl get deployment -A -o json | jq -r '.items[] | select(.metadata.labels["app.kubernetes.io/component"] == "controller") | "\(.metadata.namespace)/\(.metadata.name)\t\(.spec.template.spec.serviceAccountName // "default")"' | \
while read deployment sa; do
if [[ "$sa" == "default" ]]; then
echo " ⚠ 部署 $deployment 使用默认ServiceAccount"
else
echo " ✓ 部署 $deployment 使用专用ServiceAccount: $sa"
fi
done
# 检查安全上下文
echo " - 检查安全上下文..."
kubectl get deployment -A -o json | jq -r '.items[] | select(.metadata.labels["app.kubernetes.io/component"] == "controller") | "\(.metadata.namespace)/\(.metadata.name)\t\(.spec.template.spec.containers[0].securityContext.runAsNonRoot // false)"' | \
while read deployment nonRoot; do
if [[ "$nonRoot" == "true" ]]; then
echo " ✓ 部署 $deployment 配置了非root运行"
else
echo " ⚠ 部署 $deployment 可能以root用户运行"
fi
done
}
# 执行所有检查
check_crd_design
check_operator_pattern
check_admission_controllers
check_performance_reliability
check_security
echo "\n=== 检查完成 ==="
12.10.2 部署和运维最佳实践
# 扩展组件部署模板
apiVersion: v1
kind: Namespace
metadata:
name: extension-system
labels:
name: extension-system
pod-security.kubernetes.io/enforce: restricted
pod-security.kubernetes.io/audit: restricted
pod-security.kubernetes.io/warn: restricted
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: extension-controller
namespace: extension-system
automountServiceAccountToken: true
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: extension-controller
rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["custom.io"]
resources: ["customresources"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["custom.io"]
resources: ["customresources/status"]
verbs: ["get", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: extension-controller
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: extension-controller
subjects:
- kind: ServiceAccount
name: extension-controller
namespace: extension-system
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: extension-controller
namespace: extension-system
labels:
app.kubernetes.io/name: extension-controller
app.kubernetes.io/component: controller
app.kubernetes.io/version: "1.0.0"
spec:
replicas: 2
selector:
matchLabels:
app.kubernetes.io/name: extension-controller
template:
metadata:
labels:
app.kubernetes.io/name: extension-controller
app.kubernetes.io/component: controller
spec:
serviceAccountName: extension-controller
securityContext:
runAsNonRoot: true
runAsUser: 65532
fsGroup: 65532
seccompProfile:
type: RuntimeDefault
containers:
- name: controller
image: extension-controller:v1.0.0
imagePullPolicy: IfNotPresent
command:
- /manager
args:
- --leader-elect
- --metrics-bind-address=:8080
- --health-probe-bind-address=:8081
env:
- name: WATCH_NAMESPACE
value: ""
ports:
- name: metrics
containerPort: 8080
protocol: TCP
- name: health
containerPort: 8081
protocol: TCP
livenessProbe:
httpGet:
path: /healthz
port: health
initialDelaySeconds: 15
periodSeconds: 20
readinessProbe:
httpGet:
path: /readyz
port: health
initialDelaySeconds: 5
periodSeconds: 10
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 128Mi
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
volumeMounts:
- name: tmp
mountPath: /tmp
volumes:
- name: tmp
emptyDir: {}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- extension-controller
topologyKey: kubernetes.io/hostname
tolerations:
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
12.11 总结
本章详细介绍了Kubernetes的高级特性和扩展机制,包括:
核心扩展机制
- 自定义资源定义(CRD): 扩展Kubernetes API,定义自定义资源类型
- Operator模式: 基于CRD实现应用程序的自动化运维
- 调度器扩展: 自定义调度逻辑,优化Pod调度策略
- 网络插件(CNI): 实现自定义网络解决方案
- 存储插件(CSI): 集成外部存储系统
- 设备插件: 管理和分配特殊硬件资源
- 准入控制器: 实现策略验证和资源变更
开发要点
- CRD设计: 合理的API设计、版本管理和字段验证
- 控制器模式: 声明式API、控制循环和状态协调
- 错误处理: 重试机制、状态更新和事件记录
- 性能优化: 缓存机制、批量处理和资源限制
部署最佳实践
- 安全性: 最小权限原则、安全上下文和网络策略
- 可靠性: 高可用部署、健康检查和故障恢复
- 可观测性: 指标监控、日志记录和链路追踪
- 运维友好: 配置管理、版本控制和升级策略
应用场景
- 应用管理: 数据库、消息队列等有状态应用的自动化管理
- 基础设施: 网络、存储、安全等基础设施组件的集成
- 策略执行: 安全策略、合规检查和资源配额的自动化执行
- 工作流编排: 复杂业务流程的自动化编排和执行
Kubernetes的扩展机制为用户提供了强大的定制能力,通过合理使用这些机制,可以构建适合特定业务需求的容器平台。在下一章中,我们将学习Kubernetes生态系统和工具链,了解如何利用社区工具提升开发和运维效率。 “`