3.1 路由系统基础

3.1.1 基本路由定义

from flask import Flask

app = Flask(__name__)

# 基本路由
@app.route('/')
def index():
    return 'Hello, Flask!'

# 指定HTTP方法
@app.route('/users', methods=['GET', 'POST'])
def users():
    if request.method == 'POST':
        return 'Create user'
    return 'List users'

# 多个URL指向同一视图
@app.route('/about')
@app.route('/about-us')
def about():
    return 'About page'

if __name__ == '__main__':
    app.run(debug=True)

3.1.2 URL规则和变量

# 字符串参数(默认)
@app.route('/user/<username>')
def show_user(username):
    return f'User: {username}'

# 整数参数
@app.route('/post/<int:post_id>')
def show_post(post_id):
    return f'Post ID: {post_id}'

# 浮点数参数
@app.route('/price/<float:price>')
def show_price(price):
    return f'Price: ${price:.2f}'

# UUID参数
@app.route('/user/<uuid:user_id>')
def show_user_by_id(user_id):
    return f'User UUID: {user_id}'

# 路径参数(包含斜杠)
@app.route('/path/<path:subpath>')
def show_subpath(subpath):
    return f'Subpath: {subpath}'

# 多个参数
@app.route('/user/<username>/post/<int:post_id>')
def show_user_post(username, post_id):
    return f'User: {username}, Post: {post_id}'

3.1.3 URL构建和重定向

from flask import url_for, redirect, request

@app.route('/')
def index():
    return f'''
    <h1>Flask路由示例</h1>
    <ul>
        <li><a href="{url_for('about')}">关于我们</a></li>
        <li><a href="{url_for('show_user', username='john')}">用户John</a></li>
        <li><a href="{url_for('show_post', post_id=1)}">文章1</a></li>
    </ul>
    '''

@app.route('/login')
def login():
    # 重定向到首页
    return redirect(url_for('index'))

@app.route('/admin')
def admin():
    # 条件重定向
    if not session.get('logged_in'):
        return redirect(url_for('login'))
    return 'Admin Dashboard'

# 外部重定向
@app.route('/external')
def external_redirect():
    return redirect('https://www.example.com')

3.2 HTTP方法处理

3.2.1 常用HTTP方法

from flask import request, jsonify

# GET请求
@app.route('/api/users', methods=['GET'])
def get_users():
    page = request.args.get('page', 1, type=int)
    limit = request.args.get('limit', 10, type=int)
    
    # 模拟用户数据
    users = [
        {'id': i, 'name': f'User {i}', 'email': f'user{i}@example.com'}
        for i in range((page-1)*limit + 1, page*limit + 1)
    ]
    
    return jsonify({
        'users': users,
        'page': page,
        'limit': limit,
        'total': 100
    })

# POST请求
@app.route('/api/users', methods=['POST'])
def create_user():
    data = request.get_json()
    
    # 验证数据
    if not data or 'name' not in data or 'email' not in data:
        return jsonify({'error': 'Missing required fields'}), 400
    
    # 创建用户逻辑
    user = {
        'id': 101,
        'name': data['name'],
        'email': data['email'],
        'created_at': '2023-01-01T00:00:00Z'
    }
    
    return jsonify(user), 201

# PUT请求
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    data = request.get_json()
    
    # 更新用户逻辑
    user = {
        'id': user_id,
        'name': data.get('name', 'Updated User'),
        'email': data.get('email', 'updated@example.com'),
        'updated_at': '2023-01-01T00:00:00Z'
    }
    
    return jsonify(user)

# DELETE请求
@app.route('/api/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
    # 删除用户逻辑
    return '', 204

# PATCH请求
@app.route('/api/users/<int:user_id>', methods=['PATCH'])
def patch_user(user_id):
    data = request.get_json()
    
    # 部分更新逻辑
    updated_fields = {}
    if 'name' in data:
        updated_fields['name'] = data['name']
    if 'email' in data:
        updated_fields['email'] = data['email']
    
    return jsonify({
        'id': user_id,
        'updated_fields': updated_fields,
        'updated_at': '2023-01-01T00:00:00Z'
    })

3.2.2 请求数据处理

from flask import request, render_template

@app.route('/form', methods=['GET', 'POST'])
def handle_form():
    if request.method == 'GET':
        return render_template('form.html')
    
    elif request.method == 'POST':
        # 表单数据
        username = request.form.get('username')
        email = request.form.get('email')
        
        # 文件上传
        file = request.files.get('avatar')
        if file and file.filename:
            filename = secure_filename(file.filename)
            file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
        
        # 处理复选框
        interests = request.form.getlist('interests')
        
        # 处理JSON数据
        if request.is_json:
            data = request.get_json()
            return jsonify({'received': data})
        
        return f'用户: {username}, 邮箱: {email}, 兴趣: {interests}'

# 查询参数处理
@app.route('/search')
def search():
    query = request.args.get('q', '')
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    
    # 获取所有查询参数
    all_args = request.args.to_dict()
    
    return jsonify({
        'query': query,
        'page': page,
        'per_page': per_page,
        'all_args': all_args
    })

# 请求头处理
@app.route('/headers')
def show_headers():
    user_agent = request.headers.get('User-Agent')
    accept_language = request.headers.get('Accept-Language')
    
    # 获取所有请求头
    all_headers = dict(request.headers)
    
    return jsonify({
        'user_agent': user_agent,
        'accept_language': accept_language,
        'all_headers': all_headers
    })

3.3 视图函数设计

3.3.1 基本视图函数

from flask import render_template, jsonify, make_response
from datetime import datetime

# 简单视图
@app.route('/')
def home():
    return render_template('index.html', 
                         title='首页',
                         current_time=datetime.now())

# 返回JSON
@app.route('/api/status')
def api_status():
    return jsonify({
        'status': 'ok',
        'timestamp': datetime.now().isoformat(),
        'version': '1.0.0'
    })

# 自定义响应
@app.route('/custom')
def custom_response():
    response = make_response('Custom Response')
    response.headers['X-Custom-Header'] = 'Custom Value'
    response.status_code = 200
    return response

# 返回文件
@app.route('/download/<filename>')
def download_file(filename):
    return send_from_directory(
        app.config['UPLOAD_FOLDER'], 
        filename, 
        as_attachment=True
    )

3.3.2 视图类(MethodView)

from flask.views import MethodView
from flask import request, jsonify

class UserAPI(MethodView):
    """用户API视图类"""
    
    def get(self, user_id=None):
        """获取用户信息"""
        if user_id is None:
            # 获取用户列表
            users = [
                {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
                {'id': 2, 'name': 'Bob', 'email': 'bob@example.com'}
            ]
            return jsonify(users)
        else:
            # 获取单个用户
            user = {'id': user_id, 'name': f'User {user_id}', 'email': f'user{user_id}@example.com'}
            return jsonify(user)
    
    def post(self):
        """创建用户"""
        data = request.get_json()
        
        # 验证数据
        if not data or 'name' not in data:
            return jsonify({'error': 'Name is required'}), 400
        
        # 创建用户
        user = {
            'id': 3,
            'name': data['name'],
            'email': data.get('email', ''),
            'created_at': datetime.now().isoformat()
        }
        
        return jsonify(user), 201
    
    def put(self, user_id):
        """更新用户"""
        data = request.get_json()
        
        user = {
            'id': user_id,
            'name': data.get('name', f'User {user_id}'),
            'email': data.get('email', f'user{user_id}@example.com'),
            'updated_at': datetime.now().isoformat()
        }
        
        return jsonify(user)
    
    def delete(self, user_id):
        """删除用户"""
        return '', 204

# 注册视图类
user_view = UserAPI.as_view('user_api')
app.add_url_rule('/api/users/', defaults={'user_id': None}, 
                 view_func=user_view, methods=['GET'])
app.add_url_rule('/api/users/', view_func=user_view, methods=['POST'])
app.add_url_rule('/api/users/<int:user_id>', view_func=user_view, 
                 methods=['GET', 'PUT', 'DELETE'])

3.3.3 装饰器和中间件

from functools import wraps
from flask import session, abort, g
import time

# 登录验证装饰器
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            abort(401)
        return f(*args, **kwargs)
    return decorated_function

# 权限验证装饰器
def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not session.get('is_admin'):
            abort(403)
        return f(*args, **kwargs)
    return decorated_function

# 性能监控装饰器
def timing(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        start_time = time.time()
        result = f(*args, **kwargs)
        end_time = time.time()
        
        app.logger.info(f'{f.__name__} took {end_time - start_time:.4f} seconds')
        return result
    return decorated_function

# 缓存装饰器
def cache_response(timeout=300):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            cache_key = f'{f.__name__}:{request.url}'
            
            # 尝试从缓存获取
            cached_result = cache.get(cache_key)
            if cached_result:
                return cached_result
            
            # 执行函数并缓存结果
            result = f(*args, **kwargs)
            cache.set(cache_key, result, timeout=timeout)
            return result
        return decorated_function
    return decorator

# 使用装饰器
@app.route('/admin/dashboard')
@login_required
@admin_required
@timing
def admin_dashboard():
    return render_template('admin/dashboard.html')

@app.route('/api/expensive-operation')
@cache_response(timeout=600)
def expensive_operation():
    # 模拟耗时操作
    time.sleep(2)
    return jsonify({'result': 'expensive computation result'})

3.4 错误处理

3.4.1 HTTP错误处理

from flask import render_template, jsonify, request

# 404错误处理
@app.errorhandler(404)
def not_found_error(error):
    if request.path.startswith('/api/'):
        return jsonify({'error': 'Resource not found'}), 404
    return render_template('errors/404.html'), 404

# 500错误处理
@app.errorhandler(500)
def internal_error(error):
    app.logger.error(f'Server Error: {error}')
    if request.path.startswith('/api/'):
        return jsonify({'error': 'Internal server error'}), 500
    return render_template('errors/500.html'), 500

# 403错误处理
@app.errorhandler(403)
def forbidden_error(error):
    if request.path.startswith('/api/'):
        return jsonify({'error': 'Access forbidden'}), 403
    return render_template('errors/403.html'), 403

# 400错误处理
@app.errorhandler(400)
def bad_request_error(error):
    if request.path.startswith('/api/'):
        return jsonify({'error': 'Bad request'}), 400
    return render_template('errors/400.html'), 400

3.4.2 自定义异常处理

# 自定义异常类
class ValidationError(Exception):
    """验证错误异常"""
    def __init__(self, message, status_code=400, payload=None):
        super().__init__()
        self.message = message
        self.status_code = status_code
        self.payload = payload

class AuthenticationError(Exception):
    """认证错误异常"""
    def __init__(self, message='Authentication required'):
        super().__init__()
        self.message = message
        self.status_code = 401

class AuthorizationError(Exception):
    """授权错误异常"""
    def __init__(self, message='Access denied'):
        super().__init__()
        self.message = message
        self.status_code = 403

# 异常处理器
@app.errorhandler(ValidationError)
def handle_validation_error(e):
    response = jsonify({
        'error': 'Validation Error',
        'message': e.message,
        'payload': e.payload
    })
    response.status_code = e.status_code
    return response

@app.errorhandler(AuthenticationError)
def handle_authentication_error(e):
    response = jsonify({
        'error': 'Authentication Error',
        'message': e.message
    })
    response.status_code = e.status_code
    return response

@app.errorhandler(AuthorizationError)
def handle_authorization_error(e):
    response = jsonify({
        'error': 'Authorization Error',
        'message': e.message
    })
    response.status_code = e.status_code
    return response

# 使用自定义异常
@app.route('/api/validate')
def validate_data():
    data = request.get_json()
    
    if not data:
        raise ValidationError('No data provided')
    
    if 'email' not in data:
        raise ValidationError('Email is required', payload={'field': 'email'})
    
    return jsonify({'message': 'Data is valid'})

3.4.3 全局异常处理

import traceback
from flask import current_app

# 全局异常处理器
@app.errorhandler(Exception)
def handle_exception(e):
    # 记录异常信息
    current_app.logger.error(f'Unhandled exception: {str(e)}')
    current_app.logger.error(traceback.format_exc())
    
    # 开发环境显示详细错误
    if current_app.debug:
        return jsonify({
            'error': 'Internal Server Error',
            'message': str(e),
            'traceback': traceback.format_exc()
        }), 500
    
    # 生产环境返回通用错误
    if request.path.startswith('/api/'):
        return jsonify({'error': 'Internal server error'}), 500
    
    return render_template('errors/500.html'), 500

# 请求前处理
@app.before_request
def before_request():
    # 记录请求信息
    app.logger.info(f'{request.method} {request.url} from {request.remote_addr}')
    
    # 设置请求开始时间
    g.start_time = time.time()

# 请求后处理
@app.after_request
def after_request(response):
    # 计算请求处理时间
    if hasattr(g, 'start_time'):
        duration = time.time() - g.start_time
        app.logger.info(f'Request completed in {duration:.4f}s with status {response.status_code}')
    
    # 添加安全头
    response.headers['X-Content-Type-Options'] = 'nosniff'
    response.headers['X-Frame-Options'] = 'DENY'
    response.headers['X-XSS-Protection'] = '1; mode=block'
    
    return response

3.5 蓝图(Blueprints)

3.5.1 基本蓝图使用

# auth.py - 认证蓝图
from flask import Blueprint, render_template, request, session, redirect, url_for

auth_bp = Blueprint('auth', __name__, url_prefix='/auth')

@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        
        # 验证用户
        if username == 'admin' and password == 'secret':
            session['user_id'] = 1
            session['username'] = username
            return redirect(url_for('main.dashboard'))
        else:
            return render_template('auth/login.html', error='Invalid credentials')
    
    return render_template('auth/login.html')

@auth_bp.route('/logout')
def logout():
    session.clear()
    return redirect(url_for('main.index'))

@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        # 注册逻辑
        return redirect(url_for('auth.login'))
    
    return render_template('auth/register.html')
# main.py - 主蓝图
from flask import Blueprint, render_template, session

main_bp = Blueprint('main', __name__)

@main_bp.route('/')
def index():
    return render_template('index.html')

@main_bp.route('/dashboard')
def dashboard():
    if 'user_id' not in session:
        return redirect(url_for('auth.login'))
    
    return render_template('dashboard.html', username=session['username'])

@main_bp.route('/about')
def about():
    return render_template('about.html')
# api.py - API蓝图
from flask import Blueprint, jsonify, request

api_bp = Blueprint('api', __name__, url_prefix='/api/v1')

@api_bp.route('/users')
def get_users():
    users = [
        {'id': 1, 'name': 'Alice'},
        {'id': 2, 'name': 'Bob'}
    ]
    return jsonify(users)

@api_bp.route('/users/<int:user_id>')
def get_user(user_id):
    user = {'id': user_id, 'name': f'User {user_id}'}
    return jsonify(user)

# API错误处理
@api_bp.errorhandler(404)
def api_not_found(error):
    return jsonify({'error': 'Resource not found'}), 404

@api_bp.errorhandler(500)
def api_internal_error(error):
    return jsonify({'error': 'Internal server error'}), 500

3.5.2 蓝图注册和配置

# app.py - 主应用文件
from flask import Flask
from auth import auth_bp
from main import main_bp
from api import api_bp

def create_app():
    app = Flask(__name__)
    app.secret_key = 'your-secret-key'
    
    # 注册蓝图
    app.register_blueprint(main_bp)
    app.register_blueprint(auth_bp)
    app.register_blueprint(api_bp)
    
    return app

if __name__ == '__main__':
    app = create_app()
    app.run(debug=True)

3.5.3 蓝图高级特性

# admin.py - 管理员蓝图
from flask import Blueprint, render_template, request, session, abort

# 带模板文件夹和静态文件夹的蓝图
admin_bp = Blueprint('admin', __name__, 
                    url_prefix='/admin',
                    template_folder='templates/admin',
                    static_folder='static/admin',
                    static_url_path='/admin/static')

# 蓝图级别的请求处理器
@admin_bp.before_request
def require_admin():
    if not session.get('is_admin'):
        abort(403)

@admin_bp.after_request
def add_admin_headers(response):
    response.headers['X-Admin-Panel'] = 'true'
    return response

@admin_bp.route('/')
def dashboard():
    return render_template('dashboard.html')

@admin_bp.route('/users')
def manage_users():
    return render_template('users.html')

@admin_bp.route('/settings')
def settings():
    return render_template('settings.html')

# 蓝图级别的错误处理
@admin_bp.errorhandler(403)
def admin_forbidden(error):
    return render_template('admin_forbidden.html'), 403

3.5.4 蓝图组织结构

project/
├── app/
│   ├── __init__.py          # 应用工厂
│   ├── main/                # 主蓝图
│   │   ├── __init__.py
│   │   ├── views.py
│   │   └── forms.py
│   ├── auth/                # 认证蓝图
│   │   ├── __init__.py
│   │   ├── views.py
│   │   └── forms.py
│   ├── api/                 # API蓝图
│   │   ├── __init__.py
│   │   ├── users.py
│   │   └── posts.py
│   ├── admin/               # 管理蓝图
│   │   ├── __init__.py
│   │   ├── views.py
│   │   └── templates/
│   ├── templates/           # 全局模板
│   └── static/              # 全局静态文件
├── config.py
└── run.py
# app/__init__.py - 应用工厂
from flask import Flask

def create_app(config_name='default'):
    app = Flask(__name__)
    
    # 配置应用
    app.config.from_object(config[config_name])
    
    # 注册蓝图
    from .main import main_bp
    app.register_blueprint(main_bp)
    
    from .auth import auth_bp
    app.register_blueprint(auth_bp)
    
    from .api import api_bp
    app.register_blueprint(api_bp)
    
    from .admin import admin_bp
    app.register_blueprint(admin_bp)
    
    return app

3.6 URL生成和反向解析

3.6.1 url_for函数详解

from flask import url_for, redirect

@app.route('/')
def index():
    # 基本URL生成
    login_url = url_for('auth.login')
    user_url = url_for('show_user', username='john')
    
    # 带查询参数的URL
    search_url = url_for('search', q='flask', page=2)
    
    # 外部URL(需要请求上下文)
    external_url = url_for('index', _external=True)
    
    # HTTPS URL
    secure_url = url_for('index', _external=True, _scheme='https')
    
    return f'''
    <ul>
        <li><a href="{login_url}">登录</a></li>
        <li><a href="{user_url}">用户John</a></li>
        <li><a href="{search_url}">搜索</a></li>
        <li><a href="{external_url}">外部链接</a></li>
        <li><a href="{secure_url}">安全链接</a></li>
    </ul>
    '''

# 静态文件URL
@app.route('/assets')
def show_assets():
    css_url = url_for('static', filename='css/style.css')
    js_url = url_for('static', filename='js/app.js')
    img_url = url_for('static', filename='images/logo.png')
    
    return f'''
    <link rel="stylesheet" href="{css_url}">
    <script src="{js_url}"></script>
    <img src="{img_url}" alt="Logo">
    '''

3.6.2 URL构建最佳实践

# 辅助函数
def build_pagination_url(endpoint, page, **kwargs):
    """构建分页URL"""
    return url_for(endpoint, page=page, **kwargs)

def build_api_url(resource, resource_id=None, **params):
    """构建API URL"""
    if resource_id:
        endpoint = f'api.get_{resource}'
        return url_for(endpoint, id=resource_id, **params)
    else:
        endpoint = f'api.list_{resource}'
        return url_for(endpoint, **params)

# 模板中的URL生成
@app.template_global()
def modify_query(**new_values):
    """修改当前URL的查询参数"""
    args = request.args.copy()
    
    for key, value in new_values.items():
        if value is None:
            args.pop(key, None)
        else:
            args[key] = value
    
    return '{}?{}'.format(request.path, url_encode(args))

# 使用示例
@app.route('/products')
def list_products():
    page = request.args.get('page', 1, type=int)
    category = request.args.get('category')
    
    # 构建分页链接
    prev_url = build_pagination_url('list_products', page-1, category=category) if page > 1 else None
    next_url = build_pagination_url('list_products', page+1, category=category)
    
    return render_template('products.html', 
                         prev_url=prev_url, 
                         next_url=next_url)

3.7 请求钩子和上下文

3.7.1 请求钩子函数

from flask import g, request, session
import time

# 应用级钩子
@app.before_first_request
def before_first_request():
    """应用首次请求前执行"""
    app.logger.info('Application started')
    # 初始化数据库连接池
    # 加载配置缓存

@app.before_request
def before_request():
    """每个请求前执行"""
    g.start_time = time.time()
    g.user = None
    
    # 从session加载用户信息
    if 'user_id' in session:
        g.user = get_user_by_id(session['user_id'])
    
    # 记录请求信息
    app.logger.info(f'Request: {request.method} {request.url}')

@app.after_request
def after_request(response):
    """每个请求后执行"""
    # 计算请求处理时间
    if hasattr(g, 'start_time'):
        duration = time.time() - g.start_time
        response.headers['X-Response-Time'] = f'{duration:.4f}s'
    
    # 添加CORS头
    response.headers['Access-Control-Allow-Origin'] = '*'
    
    # 记录响应信息
    app.logger.info(f'Response: {response.status_code}')
    
    return response

@app.teardown_request
def teardown_request(exception):
    """请求结束时执行(无论是否有异常)"""
    if exception:
        app.logger.error(f'Request failed with exception: {exception}')
    
    # 清理资源
    if hasattr(g, 'db_connection'):
        g.db_connection.close()

@app.teardown_appcontext
def teardown_appcontext(exception):
    """应用上下文销毁时执行"""
    # 清理应用级资源
    pass

3.7.2 应用上下文和请求上下文

from flask import current_app, g, request, session

# 应用上下文示例
def get_config_value(key):
    """获取配置值"""
    return current_app.config.get(key)

def log_message(message):
    """记录日志"""
    current_app.logger.info(message)

# 请求上下文示例
def get_current_user():
    """获取当前用户"""
    if not hasattr(g, 'user'):
        user_id = session.get('user_id')
        g.user = get_user_by_id(user_id) if user_id else None
    return g.user

def is_authenticated():
    """检查用户是否已认证"""
    return get_current_user() is not None

def get_client_ip():
    """获取客户端IP"""
    if request.headers.get('X-Forwarded-For'):
        return request.headers.get('X-Forwarded-For').split(',')[0].strip()
    return request.remote_addr

# 上下文处理器
@app.context_processor
def inject_user():
    """向模板注入用户信息"""
    return {
        'current_user': get_current_user(),
        'is_authenticated': is_authenticated()
    }

@app.template_filter('datetime')
def datetime_filter(value, format='%Y-%m-%d %H:%M:%S'):
    """日期时间格式化过滤器"""
    if value is None:
        return ''
    return value.strftime(format)

本章小结

本章深入介绍了Flask的路由与视图系统,包括:

  1. 路由系统:基本路由定义、URL规则、参数类型、URL构建
  2. HTTP方法:GET、POST、PUT、DELETE等方法的处理
  3. 视图函数:基本视图、视图类、装饰器使用
  4. 错误处理:HTTP错误、自定义异常、全局异常处理
  5. 蓝图系统:蓝图创建、注册、组织结构
  6. URL生成:url_for函数、反向解析、最佳实践
  7. 请求钩子:请求生命周期、上下文管理

掌握这些概念是构建Flask应用的基础,合理的路由设计和视图组织能够让应用更加清晰和易于维护。

下一章预告

下一章我们将学习模板与静态文件,包括:

  • Jinja2模板引擎
  • 模板语法和过滤器
  • 模板继承和包含
  • 静态文件管理
  • 前端资源优化

练习题

  1. 路由设计:为一个博客系统设计RESTful路由
  2. 视图类:使用MethodView实现用户管理API
  3. 错误处理:创建统一的API错误响应格式
  4. 蓝图组织:将应用拆分为多个功能蓝图
  5. 装饰器:实现一个API限流装饰器