6.1 异步编程基础

6.1.1 JavaScript 异步模型

JavaScript 是单线程的,但通过事件循环机制支持异步操作:

// 同步 vs 异步
console.log('1. 同步操作开始');

// 异步操作
setTimeout(() => {
    console.log('3. 异步操作执行');
}, 0);

console.log('2. 同步操作结束');

// 输出顺序:1 -> 2 -> 3

// 事件循环示例
function demonstrateEventLoop() {
    console.log('Start');
    
    // 宏任务
    setTimeout(() => console.log('Timeout 1'), 0);
    setTimeout(() => console.log('Timeout 2'), 0);
    
    // 微任务
    Promise.resolve().then(() => console.log('Promise 1'));
    Promise.resolve().then(() => console.log('Promise 2'));
    
    console.log('End');
    
    // 输出:Start -> End -> Promise 1 -> Promise 2 -> Timeout 1 -> Timeout 2
}

6.1.2 回调函数的问题

// 回调地狱示例
function fetchUserData(userId: string, callback: (error: Error | null, user?: User) => void) {
    setTimeout(() => {
        if (userId === 'invalid') {
            callback(new Error('Invalid user ID'));
        } else {
            callback(null, { id: userId, name: 'Alice', email: 'alice@example.com' });
        }
    }, 1000);
}

function fetchUserPosts(userId: string, callback: (error: Error | null, posts?: Post[]) => void) {
    setTimeout(() => {
        callback(null, [{ id: '1', title: 'Hello World', content: 'First post' }]);
    }, 500);
}

function fetchPostComments(postId: string, callback: (error: Error | null, comments?: Comment[]) => void) {
    setTimeout(() => {
        callback(null, [{ id: '1', text: 'Great post!', author: 'Bob' }]);
    }, 300);
}

// 回调地狱
fetchUserData('123', (userError, user) => {
    if (userError) {
        console.error('User error:', userError);
        return;
    }
    
    fetchUserPosts(user!.id, (postsError, posts) => {
        if (postsError) {
            console.error('Posts error:', postsError);
            return;
        }
        
        fetchPostComments(posts![0].id, (commentsError, comments) => {
            if (commentsError) {
                console.error('Comments error:', commentsError);
                return;
            }
            
            console.log('User:', user);
            console.log('Posts:', posts);
            console.log('Comments:', comments);
        });
    });
});

6.2 Promise 详解

6.2.1 Promise 基础

// Promise 的三种状态
enum PromiseState {
    Pending = 'pending',
    Fulfilled = 'fulfilled',
    Rejected = 'rejected'
}

// 创建 Promise
function createPromise<T>(value: T, delay: number = 1000): Promise<T> {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (Math.random() > 0.1) {
                resolve(value);
            } else {
                reject(new Error('Random failure'));
            }
        }, delay);
    });
}

// 使用 Promise
const promise = createPromise('Hello, Promise!');

promise
    .then(value => {
        console.log('Success:', value);
        return value.toUpperCase();
    })
    .then(upperValue => {
        console.log('Transformed:', upperValue);
    })
    .catch(error => {
        console.error('Error:', error.message);
    })
    .finally(() => {
        console.log('Promise completed');
    });

6.2.2 Promise 类型定义

// 自定义 Promise 类型
interface CustomPromise<T> {
    then<U>(onFulfilled?: (value: T) => U | Promise<U>): CustomPromise<U>;
    catch<U>(onRejected?: (reason: any) => U | Promise<U>): CustomPromise<T | U>;
    finally(onFinally?: () => void): CustomPromise<T>;
}

// 泛型 Promise 工厂
class PromiseFactory {
    static delay(ms: number): Promise<void> {
        return new Promise(resolve => setTimeout(resolve, ms));
    }
    
    static timeout<T>(promise: Promise<T>, ms: number): Promise<T> {
        return Promise.race([
            promise,
            new Promise<never>((_, reject) => 
                setTimeout(() => reject(new Error('Timeout')), ms)
            )
        ]);
    }
    
    static retry<T>(
        fn: () => Promise<T>, 
        maxAttempts: number = 3,
        delay: number = 1000
    ): Promise<T> {
        return new Promise(async (resolve, reject) => {
            for (let attempt = 1; attempt <= maxAttempts; attempt++) {
                try {
                    const result = await fn();
                    resolve(result);
                    return;
                } catch (error) {
                    if (attempt === maxAttempts) {
                        reject(error);
                        return;
                    }
                    await this.delay(delay * attempt);
                }
            }
        });
    }
}

// 使用示例
PromiseFactory.timeout(
    fetch('https://api.example.com/data'),
    5000
).then(response => response.json())
 .catch(error => console.error('Request failed or timed out:', error));

PromiseFactory.retry(
    () => fetch('https://unreliable-api.com/data').then(r => r.json()),
    3,
    1000
).then(data => console.log('Data:', data))
 .catch(error => console.error('All attempts failed:', error));

6.2.3 Promise 组合方法

// Promise.all - 所有 Promise 都成功
interface User {
    id: string;
    name: string;
    email: string;
}

interface Post {
    id: string;
    title: string;
    content: string;
    userId: string;
}

interface Comment {
    id: string;
    text: string;
    postId: string;
    author: string;
}

async function fetchUserData(userId: string): Promise<User> {
    await PromiseFactory.delay(1000);
    return { id: userId, name: 'Alice', email: 'alice@example.com' };
}

async function fetchUserPosts(userId: string): Promise<Post[]> {
    await PromiseFactory.delay(800);
    return [
        { id: '1', title: 'First Post', content: 'Hello World', userId },
        { id: '2', title: 'Second Post', content: 'TypeScript is great', userId }
    ];
}

async function fetchPostComments(postId: string): Promise<Comment[]> {
    await PromiseFactory.delay(600);
    return [
        { id: '1', text: 'Great post!', postId, author: 'Bob' },
        { id: '2', text: 'Thanks for sharing', postId, author: 'Charlie' }
    ];
}

// Promise.all 示例
async function loadUserDashboard(userId: string) {
    try {
        const [user, posts] = await Promise.all([
            fetchUserData(userId),
            fetchUserPosts(userId)
        ]);
        
        console.log('User:', user);
        console.log('Posts:', posts);
        
        // 并行获取所有帖子的评论
        const commentsPromises = posts.map(post => fetchPostComments(post.id));
        const allComments = await Promise.all(commentsPromises);
        
        console.log('All comments:', allComments.flat());
    } catch (error) {
        console.error('Failed to load dashboard:', error);
    }
}

// Promise.allSettled - 等待所有 Promise 完成(无论成功或失败)
async function loadUserDashboardSafe(userId: string) {
    const results = await Promise.allSettled([
        fetchUserData(userId),
        fetchUserPosts(userId),
        fetch('https://api.example.com/notifications').then(r => r.json())
    ]);
    
    results.forEach((result, index) => {
        if (result.status === 'fulfilled') {
            console.log(`Result ${index}:`, result.value);
        } else {
            console.error(`Error ${index}:`, result.reason);
        }
    });
}

// Promise.race - 第一个完成的 Promise
async function fetchWithFallback(primaryUrl: string, fallbackUrl: string) {
    try {
        const result = await Promise.race([
            fetch(primaryUrl),
            fetch(fallbackUrl)
        ]);
        return await result.json();
    } catch (error) {
        console.error('Both requests failed:', error);
        throw error;
    }
}

// Promise.any - 第一个成功的 Promise
async function fetchFromMultipleSources(urls: string[]) {
    try {
        const response = await Promise.any(
            urls.map(url => fetch(url))
        );
        return await response.json();
    } catch (error) {
        console.error('All requests failed:', error);
        throw error;
    }
}

6.3 async/await 详解

6.3.1 async/await 基础

// async 函数总是返回 Promise
async function simpleAsync(): Promise<string> {
    return 'Hello, async!';
}

// 等价于
function simplePromise(): Promise<string> {
    return Promise.resolve('Hello, async!');
}

// await 只能在 async 函数中使用
async function demonstrateAwait() {
    try {
        const result1 = await simpleAsync();
        console.log(result1);
        
        const result2 = await PromiseFactory.delay(1000).then(() => 'Delayed result');
        console.log(result2);
        
        // 并行执行
        const [user, posts] = await Promise.all([
            fetchUserData('123'),
            fetchUserPosts('123')
        ]);
        
        console.log('User and posts loaded:', { user, posts });
    } catch (error) {
        console.error('Error in async function:', error);
    }
}

// 顶级 await(ES2022+)
// 在模块顶层可以直接使用 await
const config = await fetch('/api/config').then(r => r.json());
console.log('Config loaded:', config);

6.3.2 错误处理

// 多种错误处理方式
class ApiError extends Error {
    constructor(
        message: string,
        public statusCode: number,
        public response?: any
    ) {
        super(message);
        this.name = 'ApiError';
    }
}

class NetworkError extends Error {
    constructor(message: string, public originalError: Error) {
        super(message);
        this.name = 'NetworkError';
    }
}

async function fetchWithErrorHandling(url: string): Promise<any> {
    try {
        const response = await fetch(url);
        
        if (!response.ok) {
            throw new ApiError(
                `HTTP ${response.status}: ${response.statusText}`,
                response.status,
                await response.text()
            );
        }
        
        return await response.json();
    } catch (error) {
        if (error instanceof TypeError) {
            // 网络错误
            throw new NetworkError('Network request failed', error);
        }
        
        // 重新抛出其他错误
        throw error;
    }
}

// 使用 try-catch 处理错误
async function handleApiCall() {
    try {
        const data = await fetchWithErrorHandling('https://api.example.com/data');
        console.log('Data received:', data);
    } catch (error) {
        if (error instanceof ApiError) {
            console.error(`API Error ${error.statusCode}: ${error.message}`);
            if (error.statusCode === 401) {
                // 处理认证错误
                redirectToLogin();
            }
        } else if (error instanceof NetworkError) {
            console.error('Network Error:', error.message);
            showOfflineMessage();
        } else {
            console.error('Unexpected error:', error);
        }
    }
}

// 错误边界模式
class AsyncErrorBoundary {
    static async safeExecute<T>(
        fn: () => Promise<T>,
        fallback?: T,
        onError?: (error: Error) => void
    ): Promise<T | undefined> {
        try {
            return await fn();
        } catch (error) {
            if (onError) {
                onError(error as Error);
            } else {
                console.error('Async operation failed:', error);
            }
            return fallback;
        }
    }
}

// 使用错误边界
const userData = await AsyncErrorBoundary.safeExecute(
    () => fetchUserData('123'),
    { id: '123', name: 'Unknown', email: 'unknown@example.com' },
    (error) => console.error('Failed to fetch user:', error)
);

function redirectToLogin() {
    console.log('Redirecting to login...');
}

function showOfflineMessage() {
    console.log('You appear to be offline');
}

6.3.3 并发控制

// 并发限制器
class ConcurrencyLimiter {
    private running = 0;
    private queue: Array<() => void> = [];
    
    constructor(private maxConcurrency: number) {}
    
    async execute<T>(fn: () => Promise<T>): Promise<T> {
        return new Promise((resolve, reject) => {
            const task = async () => {
                try {
                    this.running++;
                    const result = await fn();
                    resolve(result);
                } catch (error) {
                    reject(error);
                } finally {
                    this.running--;
                    this.processQueue();
                }
            };
            
            if (this.running < this.maxConcurrency) {
                task();
            } else {
                this.queue.push(task);
            }
        });
    }
    
    private processQueue() {
        if (this.queue.length > 0 && this.running < this.maxConcurrency) {
            const task = this.queue.shift()!;
            task();
        }
    }
}

// 批处理器
class BatchProcessor<T, R> {
    private batch: T[] = [];
    private timer: NodeJS.Timeout | null = null;
    
    constructor(
        private batchSize: number,
        private batchTimeout: number,
        private processor: (items: T[]) => Promise<R[]>
    ) {}
    
    async add(item: T): Promise<R> {
        return new Promise((resolve, reject) => {
            this.batch.push(item);
            
            const itemIndex = this.batch.length - 1;
            
            // 设置超时处理
            if (this.timer) {
                clearTimeout(this.timer);
            }
            
            this.timer = setTimeout(() => {
                this.processBatch().then(results => {
                    resolve(results[itemIndex]);
                }).catch(reject);
            }, this.batchTimeout);
            
            // 如果达到批次大小,立即处理
            if (this.batch.length >= this.batchSize) {
                if (this.timer) {
                    clearTimeout(this.timer);
                    this.timer = null;
                }
                
                this.processBatch().then(results => {
                    resolve(results[itemIndex]);
                }).catch(reject);
            }
        });
    }
    
    private async processBatch(): Promise<R[]> {
        const currentBatch = [...this.batch];
        this.batch = [];
        
        return await this.processor(currentBatch);
    }
}

// 使用示例
const limiter = new ConcurrencyLimiter(3);

// 限制并发请求
const urls = [
    'https://api.example.com/data1',
    'https://api.example.com/data2',
    'https://api.example.com/data3',
    'https://api.example.com/data4',
    'https://api.example.com/data5'
];

const results = await Promise.all(
    urls.map(url => 
        limiter.execute(() => fetch(url).then(r => r.json()))
    )
);

console.log('All requests completed:', results);

// 批处理示例
const batchProcessor = new BatchProcessor(
    5, // 批次大小
    1000, // 超时时间
    async (userIds: string[]) => {
        // 批量获取用户数据
        const response = await fetch('/api/users/batch', {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({ userIds })
        });
        return await response.json();
    }
);

// 添加单个用户请求,会自动批处理
const user1 = await batchProcessor.add('user1');
const user2 = await batchProcessor.add('user2');
const user3 = await batchProcessor.add('user3');

6.4 生成器和异步迭代器

6.4.1 生成器函数

// 基本生成器
function* numberGenerator(): Generator<number, void, unknown> {
    let i = 0;
    while (true) {
        yield i++;
    }
}

const numbers = numberGenerator();
console.log(numbers.next().value); // 0
console.log(numbers.next().value); // 1
console.log(numbers.next().value); // 2

// 有限生成器
function* rangeGenerator(start: number, end: number): Generator<number, number, unknown> {
    for (let i = start; i <= end; i++) {
        yield i;
    }
    return end - start + 1; // 返回值
}

const range = rangeGenerator(1, 5);
for (const num of range) {
    console.log(num); // 1, 2, 3, 4, 5
}

// 生成器可以接收值
function* twoWayGenerator(): Generator<number, void, string> {
    let value = 0;
    while (true) {
        const input = yield value;
        if (input) {
            value = parseInt(input, 10) || 0;
        } else {
            value++;
        }
    }
}

const twoWay = twoWayGenerator();
console.log(twoWay.next().value); // 0
console.log(twoWay.next('10').value); // 10
console.log(twoWay.next().value); // 11

6.4.2 异步生成器

// 异步生成器
async function* asyncNumberGenerator(): AsyncGenerator<number, void, unknown> {
    let i = 0;
    while (i < 5) {
        await PromiseFactory.delay(1000);
        yield i++;
    }
}

// 使用异步生成器
async function consumeAsyncGenerator() {
    for await (const num of asyncNumberGenerator()) {
        console.log('Async number:', num);
    }
}

// 数据流处理
async function* fetchDataStream(urls: string[]): AsyncGenerator<any, void, unknown> {
    for (const url of urls) {
        try {
            const response = await fetch(url);
            const data = await response.json();
            yield data;
        } catch (error) {
            console.error(`Failed to fetch ${url}:`, error);
        }
    }
}

// 分页数据获取
async function* paginatedFetch<T>(
    baseUrl: string,
    pageSize: number = 10
): AsyncGenerator<T[], void, unknown> {
    let page = 1;
    let hasMore = true;
    
    while (hasMore) {
        try {
            const response = await fetch(`${baseUrl}?page=${page}&size=${pageSize}`);
            const data = await response.json();
            
            if (data.items && data.items.length > 0) {
                yield data.items;
                hasMore = data.hasMore;
                page++;
            } else {
                hasMore = false;
            }
        } catch (error) {
            console.error('Pagination fetch error:', error);
            hasMore = false;
        }
    }
}

// 使用分页获取
async function loadAllData() {
    const allItems: any[] = [];
    
    for await (const page of paginatedFetch('/api/items', 20)) {
        allItems.push(...page);
        console.log(`Loaded ${page.length} items, total: ${allItems.length}`);
    }
    
    return allItems;
}

6.4.3 异步迭代器模式

// 自定义异步迭代器
class AsyncDataLoader<T> implements AsyncIterable<T> {
    constructor(
        private dataSource: () => Promise<T[]>,
        private batchSize: number = 10
    ) {}
    
    async *[Symbol.asyncIterator](): AsyncIterator<T> {
        const data = await this.dataSource();
        
        for (let i = 0; i < data.length; i += this.batchSize) {
            const batch = data.slice(i, i + this.batchSize);
            
            for (const item of batch) {
                yield item;
            }
            
            // 在批次之间添加延迟
            if (i + this.batchSize < data.length) {
                await PromiseFactory.delay(100);
            }
        }
    }
}

// 实时数据流
class RealTimeDataStream<T> implements AsyncIterable<T> {
    private listeners: Array<(data: T) => void> = [];
    private buffer: T[] = [];
    private isActive = false;
    
    constructor(private source: EventSource | WebSocket) {
        this.setupEventHandlers();
    }
    
    private setupEventHandlers() {
        if (this.source instanceof EventSource) {
            this.source.onmessage = (event) => {
                const data = JSON.parse(event.data);
                this.emit(data);
            };
        } else if (this.source instanceof WebSocket) {
            this.source.onmessage = (event) => {
                const data = JSON.parse(event.data);
                this.emit(data);
            };
        }
    }
    
    private emit(data: T) {
        this.buffer.push(data);
        this.listeners.forEach(listener => listener(data));
    }
    
    async *[Symbol.asyncIterator](): AsyncIterator<T> {
        this.isActive = true;
        let index = 0;
        
        while (this.isActive) {
            if (index < this.buffer.length) {
                yield this.buffer[index++];
            } else {
                // 等待新数据
                await new Promise<void>(resolve => {
                    const listener = () => {
                        this.listeners = this.listeners.filter(l => l !== listener);
                        resolve();
                    };
                    this.listeners.push(listener);
                });
            }
        }
    }
    
    stop() {
        this.isActive = false;
    }
}

// 使用示例
async function processDataStream() {
    const loader = new AsyncDataLoader(
        async () => {
            // 模拟数据获取
            return Array.from({ length: 100 }, (_, i) => ({ id: i, value: Math.random() }));
        },
        5
    );
    
    for await (const item of loader) {
        console.log('Processing item:', item);
        // 处理每个数据项
    }
}

6.5 高级异步模式

6.5.1 发布订阅模式

// 类型安全的事件发射器
type EventMap = {
    [K: string]: any[];
};

class TypedEventEmitter<T extends EventMap> {
    private listeners: {
        [K in keyof T]?: Array<(...args: T[K]) => void | Promise<void>>;
    } = {};
    
    on<K extends keyof T>(event: K, listener: (...args: T[K]) => void | Promise<void>): void {
        if (!this.listeners[event]) {
            this.listeners[event] = [];
        }
        this.listeners[event]!.push(listener);
    }
    
    off<K extends keyof T>(event: K, listener: (...args: T[K]) => void | Promise<void>): void {
        if (this.listeners[event]) {
            this.listeners[event] = this.listeners[event]!.filter(l => l !== listener);
        }
    }
    
    async emit<K extends keyof T>(event: K, ...args: T[K]): Promise<void> {
        if (this.listeners[event]) {
            const promises = this.listeners[event]!.map(listener => 
                Promise.resolve(listener(...args))
            );
            await Promise.all(promises);
        }
    }
    
    once<K extends keyof T>(event: K): Promise<T[K]> {
        return new Promise(resolve => {
            const listener = (...args: T[K]) => {
                this.off(event, listener);
                resolve(args);
            };
            this.on(event, listener);
        });
    }
}

// 定义事件类型
interface AppEvents {
    'user:login': [{ userId: string; timestamp: Date }];
    'user:logout': [{ userId: string }];
    'data:updated': [{ type: string; data: any }];
    'error': [Error];
}

// 使用类型安全的事件发射器
const eventEmitter = new TypedEventEmitter<AppEvents>();

// 监听事件
eventEmitter.on('user:login', async ({ userId, timestamp }) => {
    console.log(`User ${userId} logged in at ${timestamp}`);
    // 异步处理登录逻辑
    await updateUserStatus(userId, 'online');
});

eventEmitter.on('data:updated', async ({ type, data }) => {
    console.log(`Data updated: ${type}`, data);
    // 异步处理数据更新
    await syncDataToServer(type, data);
});

// 发射事件
await eventEmitter.emit('user:login', { userId: '123', timestamp: new Date() });
await eventEmitter.emit('data:updated', { type: 'user', data: { name: 'Alice' } });

// 等待单次事件
const [loginData] = await eventEmitter.once('user:login');
console.log('User logged in:', loginData);

async function updateUserStatus(userId: string, status: string): Promise<void> {
    console.log(`Updating user ${userId} status to ${status}`);
}

async function syncDataToServer(type: string, data: any): Promise<void> {
    console.log(`Syncing ${type} data to server:`, data);
}

6.5.2 响应式编程模式

// 简单的 Observable 实现
class Observable<T> {
    constructor(
        private subscriber: (observer: Observer<T>) => (() => void) | void
    ) {}
    
    subscribe(observer: Partial<Observer<T>>): Subscription {
        const fullObserver: Observer<T> = {
            next: observer.next || (() => {}),
            error: observer.error || ((err) => { throw err; }),
            complete: observer.complete || (() => {})
        };
        
        const unsubscribe = this.subscriber(fullObserver) || (() => {});
        
        return {
            unsubscribe
        };
    }
    
    map<U>(fn: (value: T) => U): Observable<U> {
        return new Observable<U>(observer => {
            return this.subscribe({
                next: value => observer.next(fn(value)),
                error: err => observer.error(err),
                complete: () => observer.complete()
            }).unsubscribe;
        });
    }
    
    filter(predicate: (value: T) => boolean): Observable<T> {
        return new Observable<T>(observer => {
            return this.subscribe({
                next: value => {
                    if (predicate(value)) {
                        observer.next(value);
                    }
                },
                error: err => observer.error(err),
                complete: () => observer.complete()
            }).unsubscribe;
        });
    }
    
    static fromPromise<T>(promise: Promise<T>): Observable<T> {
        return new Observable<T>(observer => {
            promise
                .then(value => {
                    observer.next(value);
                    observer.complete();
                })
                .catch(err => observer.error(err));
        });
    }
    
    static interval(ms: number): Observable<number> {
        return new Observable<number>(observer => {
            let count = 0;
            const timer = setInterval(() => {
                observer.next(count++);
            }, ms);
            
            return () => clearInterval(timer);
        });
    }
}

interface Observer<T> {
    next: (value: T) => void;
    error: (err: any) => void;
    complete: () => void;
}

interface Subscription {
    unsubscribe: () => void;
}

// 使用 Observable
const numbers$ = Observable.interval(1000)
    .map(n => n * 2)
    .filter(n => n % 4 === 0);

const subscription = numbers$.subscribe({
    next: value => console.log('Received:', value),
    error: err => console.error('Error:', err),
    complete: () => console.log('Completed')
});

// 5秒后取消订阅
setTimeout(() => {
    subscription.unsubscribe();
    console.log('Unsubscribed');
}, 5000);

// 从 Promise 创建 Observable
const userPromise = fetchUserData('123');
const user$ = Observable.fromPromise(userPromise);

user$.subscribe({
    next: user => console.log('User loaded:', user),
    error: err => console.error('Failed to load user:', err)
});

6.5.3 状态机模式

// 异步状态机
type State = 'idle' | 'loading' | 'success' | 'error';
type Event = 'FETCH' | 'SUCCESS' | 'ERROR' | 'RESET';

interface StateMachine<TState, TEvent, TContext> {
    state: TState;
    context: TContext;
    transition(event: TEvent, payload?: any): Promise<void>;
    onStateChange(callback: (state: TState, context: TContext) => void): void;
}

class AsyncStateMachine implements StateMachine<State, Event, { data?: any; error?: Error }> {
    state: State = 'idle';
    context: { data?: any; error?: Error } = {};
    
    private listeners: Array<(state: State, context: { data?: any; error?: Error }) => void> = [];
    
    async transition(event: Event, payload?: any): Promise<void> {
        const previousState = this.state;
        
        switch (this.state) {
            case 'idle':
                if (event === 'FETCH') {
                    this.state = 'loading';
                    this.context = {};
                    this.notifyListeners();
                    
                    try {
                        const data = await this.fetchData(payload);
                        await this.transition('SUCCESS', data);
                    } catch (error) {
                        await this.transition('ERROR', error);
                    }
                }
                break;
                
            case 'loading':
                if (event === 'SUCCESS') {
                    this.state = 'success';
                    this.context = { data: payload };
                } else if (event === 'ERROR') {
                    this.state = 'error';
                    this.context = { error: payload };
                }
                break;
                
            case 'success':
            case 'error':
                if (event === 'RESET') {
                    this.state = 'idle';
                    this.context = {};
                } else if (event === 'FETCH') {
                    this.state = 'loading';
                    this.context = {};
                    this.notifyListeners();
                    
                    try {
                        const data = await this.fetchData(payload);
                        await this.transition('SUCCESS', data);
                    } catch (error) {
                        await this.transition('ERROR', error);
                    }
                }
                break;
        }
        
        if (this.state !== previousState) {
            this.notifyListeners();
        }
    }
    
    onStateChange(callback: (state: State, context: { data?: any; error?: Error }) => void): void {
        this.listeners.push(callback);
    }
    
    private notifyListeners(): void {
        this.listeners.forEach(listener => listener(this.state, this.context));
    }
    
    private async fetchData(url: string): Promise<any> {
        const response = await fetch(url);
        if (!response.ok) {
            throw new Error(`HTTP ${response.status}: ${response.statusText}`);
        }
        return await response.json();
    }
}

// 使用状态机
const stateMachine = new AsyncStateMachine();

stateMachine.onStateChange((state, context) => {
    console.log('State changed:', state, context);
    
    switch (state) {
        case 'loading':
            showLoadingSpinner();
            break;
        case 'success':
            hideLoadingSpinner();
            displayData(context.data);
            break;
        case 'error':
            hideLoadingSpinner();
            showError(context.error);
            break;
        case 'idle':
            hideLoadingSpinner();
            clearDisplay();
            break;
    }
});

// 触发状态转换
await stateMachine.transition('FETCH', 'https://api.example.com/data');

function showLoadingSpinner() {
    console.log('Showing loading spinner...');
}

function hideLoadingSpinner() {
    console.log('Hiding loading spinner...');
}

function displayData(data: any) {
    console.log('Displaying data:', data);
}

function showError(error: Error | undefined) {
    console.error('Showing error:', error?.message);
}

function clearDisplay() {
    console.log('Clearing display...');
}

6.6 性能优化

6.6.1 异步操作优化

// 缓存机制
class AsyncCache<K, V> {
    private cache = new Map<K, Promise<V>>();
    private ttl: number;
    
    constructor(ttl: number = 5 * 60 * 1000) { // 默认5分钟
        this.ttl = ttl;
    }
    
    async get(key: K, factory: () => Promise<V>): Promise<V> {
        if (this.cache.has(key)) {
            return this.cache.get(key)!;
        }
        
        const promise = factory();
        this.cache.set(key, promise);
        
        // 设置过期时间
        setTimeout(() => {
            this.cache.delete(key);
        }, this.ttl);
        
        try {
            return await promise;
        } catch (error) {
            // 如果失败,立即从缓存中移除
            this.cache.delete(key);
            throw error;
        }
    }
    
    clear(): void {
        this.cache.clear();
    }
    
    delete(key: K): boolean {
        return this.cache.delete(key);
    }
}

// 请求去重
class RequestDeduplicator {
    private pendingRequests = new Map<string, Promise<any>>();
    
    async dedupe<T>(key: string, factory: () => Promise<T>): Promise<T> {
        if (this.pendingRequests.has(key)) {
            return this.pendingRequests.get(key)!;
        }
        
        const promise = factory();
        this.pendingRequests.set(key, promise);
        
        try {
            const result = await promise;
            return result;
        } finally {
            this.pendingRequests.delete(key);
        }
    }
}

// 使用示例
const cache = new AsyncCache<string, User>();
const deduplicator = new RequestDeduplicator();

async function getUser(userId: string): Promise<User> {
    return cache.get(userId, () => 
        deduplicator.dedupe(`user:${userId}`, () => 
            fetchUserData(userId)
        )
    );
}

// 多次调用只会发起一次请求
const [user1, user2, user3] = await Promise.all([
    getUser('123'),
    getUser('123'),
    getUser('123')
]);

console.log('All users are the same:', user1 === user2 && user2 === user3);

6.6.2 内存管理

// 弱引用缓存
class WeakAsyncCache<K extends object, V> {
    private cache = new WeakMap<K, Promise<V>>();
    
    async get(key: K, factory: () => Promise<V>): Promise<V> {
        if (this.cache.has(key)) {
            return this.cache.get(key)!;
        }
        
        const promise = factory();
        this.cache.set(key, promise);
        
        return promise;
    }
}

// 资源清理
class ResourceManager {
    private resources: Array<() => void | Promise<void>> = [];
    
    register(cleanup: () => void | Promise<void>): void {
        this.resources.push(cleanup);
    }
    
    async cleanup(): Promise<void> {
        const cleanupPromises = this.resources.map(cleanup => 
            Promise.resolve(cleanup())
        );
        
        await Promise.all(cleanupPromises);
        this.resources = [];
    }
}

// 使用 AbortController 取消请求
class CancellableRequest {
    private controller: AbortController;
    
    constructor() {
        this.controller = new AbortController();
    }
    
    async fetch(url: string, options: RequestInit = {}): Promise<Response> {
        return fetch(url, {
            ...options,
            signal: this.controller.signal
        });
    }
    
    cancel(): void {
        this.controller.abort();
    }
}

// 使用示例
const resourceManager = new ResourceManager();

async function setupAsyncOperation() {
    const request = new CancellableRequest();
    
    // 注册清理函数
    resourceManager.register(() => {
        request.cancel();
        console.log('Request cancelled');
    });
    
    try {
        const response = await request.fetch('https://api.example.com/data');
        const data = await response.json();
        return data;
    } catch (error) {
        if (error.name === 'AbortError') {
            console.log('Request was cancelled');
        } else {
            console.error('Request failed:', error);
        }
        throw error;
    }
}

// 在组件卸载或页面离开时清理资源
window.addEventListener('beforeunload', () => {
    resourceManager.cleanup();
});

6.7 本章练习

练习 1:实现异步队列

// 实现一个异步队列,支持以下功能:
// 1. 添加任务到队列
// 2. 控制并发数量
// 3. 支持优先级
// 4. 支持任务取消
// 5. 提供进度回调

interface Task<T> {
    id: string;
    priority: number;
    execute: () => Promise<T>;
    onProgress?: (progress: number) => void;
}

class AsyncQueue<T> {
    // 你的实现
}

// 使用示例
const queue = new AsyncQueue<any>(3); // 最大并发数为3

queue.add({
    id: 'task1',
    priority: 1,
    execute: async () => {
        // 模拟异步任务
        await new Promise(resolve => setTimeout(resolve, 1000));
        return 'Task 1 completed';
    },
    onProgress: (progress) => console.log(`Task 1 progress: ${progress}%`)
});

queue.add({
    id: 'task2',
    priority: 2,
    execute: async () => {
        await new Promise(resolve => setTimeout(resolve, 2000));
        return 'Task 2 completed';
    }
});

// 取消任务
queue.cancel('task1');

// 等待所有任务完成
const results = await queue.waitForAll();
console.log('All tasks completed:', results);

练习 2:实现数据流处理管道

// 实现一个数据流处理管道,支持:
// 1. 链式操作(map, filter, reduce等)
// 2. 异步处理
// 3. 错误处理
// 4. 背压控制

class AsyncPipeline<T> {
    // 你的实现
}

// 使用示例
const pipeline = new AsyncPipeline<number>()
    .map(async (x) => x * 2)
    .filter(async (x) => x > 10)
    .map(async (x) => ({ value: x, timestamp: Date.now() }))
    .catch((error, item) => {
        console.error('Error processing item:', item, error);
        return null; // 跳过错误项
    });

// 处理数据流
const input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const results = await pipeline.process(input);
console.log('Pipeline results:', results);

练习 3:实现分布式锁

// 实现一个基于 Promise 的分布式锁机制:
// 1. 支持锁的获取和释放
// 2. 支持锁超时
// 3. 支持锁的重入
// 4. 提供锁状态查询

class DistributedLock {
    // 你的实现
}

// 使用示例
const lock = new DistributedLock();

async function criticalSection(id: string) {
    const lockKey = 'resource:123';
    
    try {
        await lock.acquire(lockKey, { timeout: 5000 });
        console.log(`${id} acquired lock`);
        
        // 执行关键代码
        await new Promise(resolve => setTimeout(resolve, 2000));
        
        console.log(`${id} finished critical section`);
    } finally {
        await lock.release(lockKey);
        console.log(`${id} released lock`);
    }
}

// 并发执行
Promise.all([
    criticalSection('Process A'),
    criticalSection('Process B'),
    criticalSection('Process C')
]);

6.8 本章总结

本章深入探讨了 TypeScript 中的异步编程,包括:

  1. 异步编程基础:理解了 JavaScript 的异步模型和事件循环
  2. Promise 详解:掌握了 Promise 的使用、类型定义和组合方法
  3. async/await 详解:学习了现代异步编程语法和错误处理
  4. 生成器和异步迭代器:了解了数据流处理的高级模式
  5. 高级异步模式:掌握了发布订阅、响应式编程和状态机模式
  6. 性能优化:学习了缓存、去重和资源管理等优化技术

异步编程是现代 JavaScript/TypeScript 开发的核心技能,正确理解和使用这些概念对于构建高性能、可维护的应用程序至关重要。

下一章我们将学习错误处理和调试技巧,了解如何在 TypeScript 项目中有效地处理错误和进行调试。