1. WinDbg扩展概述

1.1 扩展类型

WinDbg支持多种类型的扩展,每种都有其特定的用途和优势:

传统扩展(Legacy Extensions)

  • 特点:基于C/C++开发,使用WinDbg SDK
  • 优势:性能高,功能强大,访问底层API
  • 适用场景:复杂的调试分析,性能敏感的操作

JavaScript扩展

  • 特点:基于JavaScript,使用数据模型API
  • 优势:开发简单,调试方便,动态加载
  • 适用场景:快速原型开发,数据分析脚本

Python扩展

  • 特点:基于Python,通过pykd模块
  • 优势:语法简洁,库丰富,易于维护
  • 适用场景:数据处理,自动化分析

1.2 扩展架构

WinDbg扩展架构图

┌─────────────────────────────────────────────────────────┐
│                    WinDbg Engine                       │
├─────────────────────────────────────────────────────────┤
│                  Extension Manager                     │
├─────────────────┬─────────────────┬─────────────────────┤
│                 │                 │                     │
│  Legacy         │  JavaScript     │     Python          │
│  Extensions     │  Extensions     │     Extensions      │
│  (.dll)         │  (.js)          │     (.py)           │
│                 │                 │                     │
├─────────────────┼─────────────────┼─────────────────────┤
│                 │                 │                     │
│  WinDbg SDK     │  Data Model     │     PyKD            │
│  APIs           │  APIs           │     Module          │
│                 │                 │                     │
└─────────────────┴─────────────────┴─────────────────────┘

扩展加载机制

# 加载扩展
.load <扩展路径>         # 加载传统扩展
.scriptload <脚本路径>   # 加载JavaScript扩展
!py <Python命令>        # 执行Python扩展

# 卸载扩展
.unload <扩展名>         # 卸载传统扩展
.scriptunload <脚本名>   # 卸载JavaScript扩展

1.3 开发环境准备

开发工具

# Visual Studio(推荐2019或更高版本)
- C++ 开发工具
- Windows SDK
- WinDbg SDK

# 替代工具
- Visual Studio Code
- CLion
- Qt Creator

SDK和库

# Windows SDK
- 包含调试API头文件
- 提供调试引擎接口

# WinDbg SDK
- 扩展开发专用SDK
- 包含示例代码

# 第三方库
- Boost(可选)
- STL(标准库)

2. 传统扩展开发

2.1 开发环境搭建

项目创建

// 创建新的DLL项目
// 项目设置:
// - 配置类型:动态库(.dll)
// - 字符集:Unicode
// - 平台工具集:v142或更高

// 包含必要的头文件
#include <windows.h>
#include <dbgeng.h>
#include <dbghelp.h>
#include <wdbgexts.h>

// 链接必要的库
#pragma comment(lib, "dbgeng.lib")
#pragma comment(lib, "dbghelp.lib")

基本框架

// extension.cpp
#include "stdafx.h"

// 全局变量
WINDBG_EXTENSION_APIS ExtensionApis;
HANDLE hCurrentProcess;
HANDLE hCurrentThread;
DWORD dwCurrentPc;
DWORD dwProcessor;
HMODULE hModule;

// DLL入口点
BOOL APIENTRY DllMain(HMODULE hModule, DWORD ul_reason_for_call, LPVOID lpReserved)
{
    switch (ul_reason_for_call)
    {
    case DLL_PROCESS_ATTACH:
        ::hModule = hModule;
        break;
    case DLL_THREAD_ATTACH:
    case DLL_THREAD_DETACH:
    case DLL_PROCESS_DETACH:
        break;
    }
    return TRUE;
}

// 扩展初始化
VOID WinDbgExtensionDllInit(PWINDBG_EXTENSION_APIS lpExtensionApis, USHORT MajorVersion, USHORT MinorVersion)
{
    ExtensionApis = *lpExtensionApis;
    
    // 保存调试器信息
    hCurrentProcess = ExtensionApis.lpGetCurrentProcessHandle();
    hCurrentThread = ExtensionApis.lpGetCurrentThreadHandle();
    dwCurrentPc = ExtensionApis.lpGetExpression("@eip");
    dwProcessor = ExtensionApis.lpGetExpression("@$proc");
    
    dprintf("Extension loaded successfully\n");
}

// 扩展版本信息
LPEXT_API_VERSION ExtensionApiVersion(void)
{
    static EXT_API_VERSION ApiVersion = 
    {
        5,      // MajorVersion
        2,      // MinorVersion  
        EXT_API_VERSION_NUMBER64,
        0       // Reserved
    };
    return &ApiVersion;
}

// 扩展清理
VOID CheckVersion(void)
{
    // 版本检查代码
}

2.2 基本扩展命令

简单命令实现

// 声明扩展命令
DECLARE_API(hello)
{
    dprintf("Hello from WinDbg extension!\n");
    dprintf("Current process: 0x%p\n", hCurrentProcess);
    dprintf("Current thread: 0x%p\n", hCurrentThread);
    dprintf("Current PC: 0x%08x\n", dwCurrentPc);
}

// 带参数的命令
DECLARE_API(showmem)
{
    ULONG64 address = 0;
    ULONG size = 16;
    
    // 解析参数
    if (args && *args)
    {
        address = GetExpression(args);
        
        // 查找大小参数
        PSTR sizeArg = strchr(args, ' ');
        if (sizeArg)
        {
            size = (ULONG)GetExpression(sizeArg + 1);
        }
    }
    
    if (address == 0)
    {
        dprintf("Usage: !showmem <address> [size]\n");
        return;
    }
    
    // 读取内存
    UCHAR buffer[256];
    ULONG bytesRead;
    
    if (ReadMemory(address, buffer, min(size, sizeof(buffer)), &bytesRead))
    {
        dprintf("Memory at 0x%I64x:\n", address);
        for (ULONG i = 0; i < bytesRead; i += 16)
        {
            dprintf("%08I64x: ", address + i);
            
            // 十六进制显示
            for (ULONG j = 0; j < 16 && (i + j) < bytesRead; j++)
            {
                dprintf("%02x ", buffer[i + j]);
            }
            
            // ASCII显示
            dprintf(" ");
            for (ULONG j = 0; j < 16 && (i + j) < bytesRead; j++)
            {
                UCHAR c = buffer[i + j];
                dprintf("%c", (c >= 32 && c <= 126) ? c : '.');
            }
            dprintf("\n");
        }
    }
    else
    {
        dprintf("Failed to read memory at 0x%I64x\n", address);
    }
}

复杂命令实现

// 堆栈分析命令
DECLARE_API(analyzestack)
{
    ULONG64 stackBase, stackLimit;
    ULONG64 currentSp;
    
    // 获取堆栈信息
    if (GetCurrentThreadStackLimits(&stackLimit, &stackBase) != S_OK)
    {
        dprintf("Failed to get stack limits\n");
        return;
    }
    
    currentSp = GetExpression("@esp");
    
    dprintf("Stack Analysis:\n");
    dprintf("Stack Base: 0x%I64x\n", stackBase);
    dprintf("Stack Limit: 0x%I64x\n", stackLimit);
    dprintf("Current SP: 0x%I64x\n", currentSp);
    dprintf("Stack Size: %I64d bytes\n", stackBase - stackLimit);
    dprintf("Used Stack: %I64d bytes\n", stackBase - currentSp);
    
    // 分析堆栈内容
    ULONG64 addr = currentSp;
    while (addr < stackBase)
    {
        ULONG64 value;
        if (ReadMemory(addr, &value, sizeof(value), NULL))
        {
            // 检查是否是有效的代码地址
            CHAR symbolName[256];
            ULONG64 displacement;
            
            if (GetSymbol(value, symbolName, &displacement) == S_OK)
            {
                dprintf("0x%I64x: 0x%I64x -> %s+0x%I64x\n", 
                       addr, value, symbolName, displacement);
            }
        }
        addr += sizeof(ULONG64);
    }
}

2.3 高级功能实现

符号解析

// 符号解析辅助函数
BOOL ResolveSymbol(ULONG64 address, PSTR symbolName, ULONG nameSize, PULONG64 displacement)
{
    SYMBOL_INFO_PACKAGE sip;
    sip.si.SizeOfStruct = sizeof(SYMBOL_INFO);
    sip.si.MaxNameLen = MAX_SYM_NAME;
    
    if (SymFromAddr(hCurrentProcess, address, displacement, &sip.si))
    {
        strncpy_s(symbolName, nameSize, sip.si.Name, _TRUNCATE);
        return TRUE;
    }
    return FALSE;
}

// 模块信息获取
DECLARE_API(modinfo)
{
    ULONG64 moduleBase = 0;
    
    if (args && *args)
    {
        moduleBase = GetExpression(args);
    }
    else
    {
        moduleBase = GetExpression("@eip");
        moduleBase = GetModuleBase(moduleBase);
    }
    
    if (moduleBase == 0)
    {
        dprintf("Invalid module address\n");
        return;
    }
    
    // 获取模块信息
    IMAGEHLP_MODULE64 moduleInfo;
    moduleInfo.SizeOfStruct = sizeof(moduleInfo);
    
    if (SymGetModuleInfo64(hCurrentProcess, moduleBase, &moduleInfo))
    {
        dprintf("Module Information:\n");
        dprintf("Base Address: 0x%I64x\n", moduleInfo.BaseOfImage);
        dprintf("Image Size: 0x%x\n", moduleInfo.ImageSize);
        dprintf("Module Name: %s\n", moduleInfo.ModuleName);
        dprintf("Image Name: %s\n", moduleInfo.ImageName);
        dprintf("Loaded Image Name: %s\n", moduleInfo.LoadedImageName);
        dprintf("Symbol Type: %d\n", moduleInfo.SymType);
        dprintf("PDB File: %s\n", moduleInfo.CVData);
    }
    else
    {
        dprintf("Failed to get module information\n");
    }
}

内存分析

// 内存区域分析
DECLARE_API(memregion)
{
    ULONG64 address = 0;
    
    if (args && *args)
    {
        address = GetExpression(args);
    }
    else
    {
        address = GetExpression("@eip");
    }
    
    MEMORY_BASIC_INFORMATION64 mbi;
    if (VirtualQueryEx(hCurrentProcess, (LPCVOID)address, (PMEMORY_BASIC_INFORMATION)&mbi, sizeof(mbi)))
    {
        dprintf("Memory Region Information:\n");
        dprintf("Base Address: 0x%I64x\n", (ULONG64)mbi.BaseAddress);
        dprintf("Allocation Base: 0x%I64x\n", (ULONG64)mbi.AllocationBase);
        dprintf("Region Size: 0x%I64x\n", mbi.RegionSize);
        dprintf("State: ");
        
        switch (mbi.State)
        {
        case MEM_COMMIT:
            dprintf("Committed\n");
            break;
        case MEM_FREE:
            dprintf("Free\n");
            break;
        case MEM_RESERVE:
            dprintf("Reserved\n");
            break;
        default:
            dprintf("Unknown (0x%x)\n", mbi.State);
            break;
        }
        
        dprintf("Protection: ");
        switch (mbi.Protect)
        {
        case PAGE_EXECUTE:
            dprintf("Execute\n");
            break;
        case PAGE_EXECUTE_READ:
            dprintf("Execute/Read\n");
            break;
        case PAGE_EXECUTE_READWRITE:
            dprintf("Execute/Read/Write\n");
            break;
        case PAGE_READONLY:
            dprintf("Read Only\n");
            break;
        case PAGE_READWRITE:
            dprintf("Read/Write\n");
            break;
        default:
            dprintf("Other (0x%x)\n", mbi.Protect);
            break;
        }
    }
    else
    {
        dprintf("Failed to query memory at 0x%I64x\n", address);
    }
}

2.4 扩展编译和部署

项目配置

// stdafx.h
#pragma once

#include "targetver.h"

#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

// WinDbg扩展头文件
#include <wdbgexts.h>
#include <dbgeng.h>
#include <dbghelp.h>

// 链接库
#pragma comment(lib, "dbgeng.lib")
#pragma comment(lib, "dbghelp.lib")

// 扩展API声明宏
#define DECLARE_API(extension) \
    CPPMOD bool CALLBACK extension(HANDLE hCurrentProcess, HANDLE hCurrentThread, \
                                  ULONG dwCurrentPc, ULONG dwProcessor, PCSTR args)

模块定义文件

; extension.def
EXPORTS
    WinDbgExtensionDllInit
    ExtensionApiVersion
    CheckVersion
    hello
    showmem
    analyzestack
    modinfo
    memregion

编译脚本

@echo off
echo Building WinDbg Extension...

# 设置环境变量
set INCLUDE=%INCLUDE%;C:\Program Files (x86)\Windows Kits\10\Debuggers\inc
set LIB=%LIB%;C:\Program Files (x86)\Windows Kits\10\Debuggers\lib\x64

# 编译
cl /LD /MD extension.cpp /link /DEF:extension.def /OUT:myextension.dll

if %ERRORLEVEL% == 0 (
    echo Build successful!
    echo Extension: myextension.dll
) else (
    echo Build failed!
)

pause

3. JavaScript扩展开发

3.1 JavaScript扩展基础

基本结构

// myextension.js
"use strict";

// 扩展初始化
function initializeScript() {
    return [
        new host.apiVersionSupport(1, 3),
        new host.functionAlias(analyzeHeap, "analyzeheap"),
        new host.functionAlias(showThreads, "showthreads"),
        new host.functionAlias(findPattern, "findpattern")
    ];
}

// 扩展清理
function uninitializeScript() {
    // 清理代码
}

// 堆分析函数
function analyzeHeap() {
    var control = host.namespace.Debugger.Utility.Control;
    
    host.diagnostics.debugLog("=== Heap Analysis ===\n");
    
    try {
        // 执行堆命令
        var heapOutput = control.ExecuteCommand("!heap -s");
        
        for (var line of heapOutput) {
            host.diagnostics.debugLog(line + "\n");
        }
        
        // 分析堆统计
        var heapStat = control.ExecuteCommand("!heap -stat -h 0");
        
        host.diagnostics.debugLog("\n=== Heap Statistics ===\n");
        for (var line of heapStat) {
            host.diagnostics.debugLog(line + "\n");
        }
        
    } catch (e) {
        host.diagnostics.debugLog("Error: " + e.message + "\n");
    }
}

线程分析

// 线程分析函数
function showThreads() {
    var control = host.namespace.Debugger.Utility.Control;
    
    host.diagnostics.debugLog("=== Thread Analysis ===\n");
    
    try {
        // 获取所有线程
        var threads = host.currentProcess.Threads;
        
        for (var thread of threads) {
            host.diagnostics.debugLog("Thread ID: " + thread.Id.toString(16) + "\n");
            
            // 切换到线程
            control.ExecuteCommand("~" + thread.Id.toString(16) + "s");
            
            // 获取线程堆栈
            var stack = control.ExecuteCommand("k");
            for (var line of stack) {
                host.diagnostics.debugLog("  " + line + "\n");
            }
            
            host.diagnostics.debugLog("\n");
        }
        
    } catch (e) {
        host.diagnostics.debugLog("Error: " + e.message + "\n");
    }
}

// 模式查找函数
function findPattern(pattern, startAddr, endAddr) {
    if (!pattern) {
        host.diagnostics.debugLog("Usage: !findpattern <pattern> [startAddr] [endAddr]\n");
        return;
    }
    
    var control = host.namespace.Debugger.Utility.Control;
    
    // 设置默认搜索范围
    if (!startAddr) startAddr = "0";
    if (!endAddr) endAddr = "L?80000000";
    
    try {
        var searchCmd = "s -b " + startAddr + " " + endAddr + " " + pattern;
        host.diagnostics.debugLog("Searching: " + searchCmd + "\n");
        
        var results = control.ExecuteCommand(searchCmd);
        
        var count = 0;
        for (var line of results) {
            host.diagnostics.debugLog(line + "\n");
            count++;
        }
        
        host.diagnostics.debugLog("Found " + count + " matches\n");
        
    } catch (e) {
        host.diagnostics.debugLog("Error: " + e.message + "\n");
    }
}

3.2 数据模型扩展

对象模型扩展

// 进程扩展
class ProcessExtension {
    constructor(process) {
        this.__process = process;
    }
    
    // 获取堆信息
    get HeapInfo() {
        var control = host.namespace.Debugger.Utility.Control;
        var output = control.ExecuteCommand("!heap -s");
        
        var heaps = [];
        for (var line of output) {
            if (line.includes("Heap")) {
                heaps.push(line.trim());
            }
        }
        return heaps;
    }
    
    // 获取模块信息
    get ModuleInfo() {
        var modules = [];
        for (var module of this.__process.Modules) {
            modules.push({
                Name: module.Name,
                BaseAddress: module.BaseAddress,
                Size: module.Size
            });
        }
        return modules;
    }
    
    // 获取线程CPU时间
    get ThreadCpuTime() {
        var control = host.namespace.Debugger.Utility.Control;
        var output = control.ExecuteCommand("!runaway");
        
        var times = [];
        for (var line of output) {
            if (line.includes(":")) {
                times.push(line.trim());
            }
        }
        return times;
    }
}

// 注册扩展
function initializeScript() {
    return [
        new host.apiVersionSupport(1, 3),
        new host.namedModelParent(ProcessExtension, "Debugger.Models.Process")
    ];
}

自定义可视化

// 自定义数据可视化
class CustomVisualizer {
    constructor(obj) {
        this.__obj = obj;
    }
    
    toString() {
        return "Custom Object at " + this.__obj.targetLocation.toString(16);
    }
    
    *[Symbol.iterator]() {
        // 自定义迭代器
        yield new host.indexedValue("Field1", this.__obj.field1);
        yield new host.indexedValue("Field2", this.__obj.field2);
        yield new host.indexedValue("Field3", this.__obj.field3);
    }
}

// 注册可视化器
function initializeScript() {
    return [
        new host.apiVersionSupport(1, 3),
        new host.typeSignatureRegistration(CustomVisualizer, "MyStruct")
    ];
}

3.3 高级JavaScript技巧

异步操作

// 异步分析函数
async function asyncAnalyze() {
    var control = host.namespace.Debugger.Utility.Control;
    
    host.diagnostics.debugLog("Starting async analysis...\n");
    
    try {
        // 模拟长时间运行的分析
        for (var i = 0; i < 10; i++) {
            host.diagnostics.debugLog("Step " + (i + 1) + "/10\n");
            
            // 执行分析步骤
            var output = control.ExecuteCommand("!heap -s");
            
            // 处理输出
            var lineCount = 0;
            for (var line of output) {
                lineCount++;
            }
            
            host.diagnostics.debugLog("Processed " + lineCount + " lines\n");
            
            // 模拟延迟
            await new Promise(resolve => setTimeout(resolve, 100));
        }
        
        host.diagnostics.debugLog("Async analysis complete\n");
        
    } catch (e) {
        host.diagnostics.debugLog("Error: " + e.message + "\n");
    }
}

// 注册异步函数
function initializeScript() {
    return [
        new host.apiVersionSupport(1, 3),
        new host.functionAlias(asyncAnalyze, "asyncanalyze")
    ];
}

错误处理和日志

// 日志记录类
class Logger {
    constructor(logLevel = "INFO") {
        this.logLevel = logLevel;
        this.levels = { "DEBUG": 0, "INFO": 1, "WARN": 2, "ERROR": 3 };
    }
    
    log(level, message) {
        if (this.levels[level] >= this.levels[this.logLevel]) {
            var timestamp = new Date().toISOString();
            host.diagnostics.debugLog("[" + timestamp + "] [" + level + "] " + message + "\n");
        }
    }
    
    debug(message) { this.log("DEBUG", message); }
    info(message) { this.log("INFO", message); }
    warn(message) { this.log("WARN", message); }
    error(message) { this.log("ERROR", message); }
}

// 全局日志实例
var logger = new Logger("DEBUG");

// 带错误处理的分析函数
function robustAnalyze() {
    logger.info("Starting robust analysis");
    
    try {
        var control = host.namespace.Debugger.Utility.Control;
        
        // 检查调试目标状态
        if (!host.currentProcess) {
            throw new Error("No current process");
        }
        
        logger.debug("Process ID: " + host.currentProcess.Id);
        
        // 执行分析
        var commands = ["!heap -s", "!handle", "lm"];
        
        for (var cmd of commands) {
            logger.debug("Executing: " + cmd);
            
            try {
                var output = control.ExecuteCommand(cmd);
                var lineCount = 0;
                
                for (var line of output) {
                    lineCount++;
                }
                
                logger.info("Command '" + cmd + "' returned " + lineCount + " lines");
                
            } catch (cmdError) {
                logger.error("Command '" + cmd + "' failed: " + cmdError.message);
            }
        }
        
        logger.info("Robust analysis complete");
        
    } catch (e) {
        logger.error("Analysis failed: " + e.message);
        logger.error("Stack trace: " + e.stack);
    }
}

4. Python扩展开发

4.1 PyKD环境搭建

安装PyKD

# 安装PyKD模块
# 方法1:使用pip
pip install pykd

# 方法2:从源码编译
# 下载PyKD源码
# 使用Visual Studio编译

# 方法3:使用预编译版本
# 从GitHub下载预编译的pykd.pyd文件

基本配置

# pykd_config.py
import sys
import os

# 添加PyKD路径
sys.path.append(r"C:\pykd")

# 导入PyKD
try:
    import pykd
    print("PyKD loaded successfully")
except ImportError as e:
    print(f"Failed to load PyKD: {e}")
    sys.exit(1)

# 基本配置
pykd.initialize()

4.2 基本Python扩展

简单分析脚本

# heap_analyzer.py
import pykd
import re

class HeapAnalyzer:
    def __init__(self):
        self.heaps = []
        self.total_committed = 0
        self.total_reserved = 0
    
    def analyze(self):
        """分析进程堆信息"""
        print("=== Heap Analysis ===")
        
        try:
            # 执行堆命令
            output = pykd.dbgCommand("!heap -s")
            
            # 解析输出
            self._parse_heap_output(output)
            
            # 显示统计信息
            self._show_statistics()
            
            # 详细分析
            self._detailed_analysis()
            
        except Exception as e:
            print(f"Error during heap analysis: {e}")
    
    def _parse_heap_output(self, output):
        """解析堆输出"""
        lines = output.split('\n')
        
        for line in lines:
            if 'Heap' in line and 'at' in line:
                # 解析堆信息
                match = re.search(r'Heap\s+(\w+)\s+at\s+(\w+)', line)
                if match:
                    heap_id = match.group(1)
                    heap_addr = match.group(2)
                    self.heaps.append({
                        'id': heap_id,
                        'address': heap_addr,
                        'line': line.strip()
                    })
            
            elif 'Total committed' in line:
                match = re.search(r'Total committed\s+(\w+)', line)
                if match:
                    self.total_committed = int(match.group(1), 16)
            
            elif 'Total reserved' in line:
                match = re.search(r'Total reserved\s+(\w+)', line)
                if match:
                    self.total_reserved = int(match.group(1), 16)
    
    def _show_statistics(self):
        """显示统计信息"""
        print(f"\nHeap Statistics:")
        print(f"Number of heaps: {len(self.heaps)}")
        print(f"Total committed: 0x{self.total_committed:x} ({self.total_committed:,} bytes)")
        print(f"Total reserved: 0x{self.total_reserved:x} ({self.total_reserved:,} bytes)")
        
        if self.total_reserved > 0:
            usage_percent = (self.total_committed / self.total_reserved) * 100
            print(f"Heap usage: {usage_percent:.2f}%")
    
    def _detailed_analysis(self):
        """详细分析每个堆"""
        print(f"\nDetailed Heap Analysis:")
        
        for heap in self.heaps:
            print(f"\nAnalyzing heap {heap['id']} at {heap['address']}:")
            
            try:
                # 获取堆详细信息
                cmd = f"!heap -stat -h {heap['address']}"
                output = pykd.dbgCommand(cmd)
                
                # 分析堆块分布
                self._analyze_heap_blocks(output)
                
            except Exception as e:
                print(f"  Error analyzing heap {heap['id']}: {e}")
    
    def _analyze_heap_blocks(self, output):
        """分析堆块分布"""
        lines = output.split('\n')
        blocks = []
        
        for line in lines:
            if re.match(r'\s*\w+\s+\w+\s+\w+', line):
                parts = line.split()
                if len(parts) >= 3:
                    try:
                        size = int(parts[0], 16)
                        count = int(parts[1], 16)
                        total = int(parts[2], 16)
                        
                        blocks.append({
                            'size': size,
                            'count': count,
                            'total': total
                        })
                    except ValueError:
                        continue
        
        if blocks:
            # 排序并显示最大的块
            blocks.sort(key=lambda x: x['total'], reverse=True)
            
            print("  Top heap block sizes:")
            for i, block in enumerate(blocks[:5]):
                print(f"    {i+1}. Size: 0x{block['size']:x}, Count: {block['count']}, Total: 0x{block['total']:x}")

# 使用示例
if __name__ == "__main__":
    analyzer = HeapAnalyzer()
    analyzer.analyze()

线程分析脚本

# thread_analyzer.py
import pykd
import time

class ThreadAnalyzer:
    def __init__(self):
        self.threads = []
        self.cpu_times = {}
    
    def analyze_threads(self):
        """分析所有线程"""
        print("=== Thread Analysis ===")
        
        try:
            # 获取线程列表
            self._get_thread_list()
            
            # 分析CPU使用
            self._analyze_cpu_usage()
            
            # 分析线程堆栈
            self._analyze_thread_stacks()
            
            # 检测死锁
            self._detect_deadlocks()
            
        except Exception as e:
            print(f"Error during thread analysis: {e}")
    
    def _get_thread_list(self):
        """获取线程列表"""
        output = pykd.dbgCommand("~")
        lines = output.split('\n')
        
        for line in lines:
            if line.strip() and not line.startswith('.'):
                parts = line.split()
                if len(parts) >= 2:
                    thread_id = parts[0].replace('~', '').replace('*', '')
                    if thread_id.isdigit():
                        self.threads.append({
                            'id': int(thread_id),
                            'line': line.strip()
                        })
        
        print(f"Found {len(self.threads)} threads")
    
    def _analyze_cpu_usage(self):
        """分析CPU使用情况"""
        print("\nCPU Usage Analysis:")
        
        try:
            output = pykd.dbgCommand("!runaway")
            lines = output.split('\n')
            
            for line in lines:
                if ':' in line and 'ms' in line:
                    parts = line.split(':')
                    if len(parts) >= 2:
                        thread_info = parts[0].strip()
                        time_info = parts[1].strip()
                        print(f"  {thread_info}: {time_info}")
                        
        except Exception as e:
            print(f"  Error getting CPU usage: {e}")
    
    def _analyze_thread_stacks(self):
        """分析线程堆栈"""
        print("\nThread Stack Analysis:")
        
        for thread in self.threads:
            try:
                print(f"\nThread {thread['id']}:")
                
                # 切换到线程
                pykd.dbgCommand(f"~{thread['id']}s")
                
                # 获取堆栈
                stack_output = pykd.dbgCommand("k")
                stack_lines = stack_output.split('\n')
                
                # 分析堆栈深度
                stack_depth = len([line for line in stack_lines if line.strip() and not line.startswith('#')])
                print(f"  Stack depth: {stack_depth}")
                
                # 显示前几层堆栈
                print("  Top stack frames:")
                for i, line in enumerate(stack_lines[:5]):
                    if line.strip() and not line.startswith('#'):
                        print(f"    {line.strip()}")
                
                # 检查是否在等待
                self._check_thread_wait_state(thread['id'])
                
            except Exception as e:
                print(f"  Error analyzing thread {thread['id']}: {e}")
    
    def _check_thread_wait_state(self, thread_id):
        """检查线程等待状态"""
        try:
            # 获取线程详细信息
            output = pykd.dbgCommand("!thread")
            
            if "Waiting" in output:
                print(f"    Thread {thread_id} is in waiting state")
                
                # 尝试获取等待对象信息
                if "WaitForSingleObject" in output or "WaitForMultipleObjects" in output:
                    print(f"    Thread {thread_id} is waiting for object(s)")
                
        except Exception as e:
            pass  # 忽略错误
    
    def _detect_deadlocks(self):
        """检测死锁"""
        print("\nDeadlock Detection:")
        
        try:
            # 检查临界区
            cs_output = pykd.dbgCommand("!cs -l")
            
            if "No critical section" in cs_output:
                print("  No locked critical sections found")
            else:
                print("  Locked critical sections detected:")
                lines = cs_output.split('\n')
                for line in lines:
                    if line.strip() and not line.startswith('!'):
                        print(f"    {line.strip()}")
            
            # 尝试自动死锁检测
            deadlock_output = pykd.dbgCommand("!analyze -hang")
            if "DEADLOCK" in deadlock_output.upper():
                print("  Potential deadlock detected!")
                
        except Exception as e:
            print(f"  Error during deadlock detection: {e}")

# 使用示例
if __name__ == "__main__":
    analyzer = ThreadAnalyzer()
    analyzer.analyze_threads()

4.3 高级Python扩展

内存泄漏检测器

# memory_leak_detector.py
import pykd
import time
import json
from collections import defaultdict

class MemoryLeakDetector:
    def __init__(self):
        self.snapshots = []
        self.leak_candidates = []
    
    def take_snapshot(self, name=""):
        """拍摄内存快照"""
        print(f"Taking memory snapshot: {name}")
        
        snapshot = {
            'name': name or f"snapshot_{len(self.snapshots)}",
            'timestamp': time.time(),
            'heap_info': self._get_heap_info(),
            'handle_info': self._get_handle_info(),
            'module_info': self._get_module_info()
        }
        
        self.snapshots.append(snapshot)
        print(f"Snapshot taken: {snapshot['name']}")
        return snapshot
    
    def _get_heap_info(self):
        """获取堆信息"""
        heap_info = {
            'heaps': [],
            'total_committed': 0,
            'total_reserved': 0
        }
        
        try:
            output = pykd.dbgCommand("!heap -s")
            lines = output.split('\n')
            
            for line in lines:
                if 'Heap' in line and 'at' in line:
                    heap_info['heaps'].append(line.strip())
                elif 'Total committed' in line:
                    match = re.search(r'Total committed\s+(\w+)', line)
                    if match:
                        heap_info['total_committed'] = int(match.group(1), 16)
                elif 'Total reserved' in line:
                    match = re.search(r'Total reserved\s+(\w+)', line)
                    if match:
                        heap_info['total_reserved'] = int(match.group(1), 16)
                        
        except Exception as e:
            print(f"Error getting heap info: {e}")
        
        return heap_info
    
    def _get_handle_info(self):
        """获取句柄信息"""
        handle_info = {
            'total_handles': 0,
            'handle_types': defaultdict(int)
        }
        
        try:
            output = pykd.dbgCommand("!handle")
            lines = output.split('\n')
            
            for line in lines:
                if 'Handle' in line and 'Type' in line:
                    parts = line.split()
                    if len(parts) >= 4:
                        handle_type = parts[3]
                        handle_info['handle_types'][handle_type] += 1
                        handle_info['total_handles'] += 1
                        
        except Exception as e:
            print(f"Error getting handle info: {e}")
        
        return handle_info
    
    def _get_module_info(self):
        """获取模块信息"""
        module_info = {
            'modules': [],
            'total_size': 0
        }
        
        try:
            output = pykd.dbgCommand("lm")
            lines = output.split('\n')
            
            for line in lines:
                if line.strip() and not line.startswith('start'):
                    parts = line.split()
                    if len(parts) >= 3:
                        start_addr = parts[0]
                        end_addr = parts[1]
                        module_name = parts[2]
                        
                        try:
                            start = int(start_addr, 16)
                            end = int(end_addr, 16)
                            size = end - start
                            
                            module_info['modules'].append({
                                'name': module_name,
                                'start': start,
                                'end': end,
                                'size': size
                            })
                            
                            module_info['total_size'] += size
                            
                        except ValueError:
                            continue
                            
        except Exception as e:
            print(f"Error getting module info: {e}")
        
        return module_info
    
    def compare_snapshots(self, snapshot1_idx=0, snapshot2_idx=-1):
        """比较两个快照"""
        if len(self.snapshots) < 2:
            print("Need at least 2 snapshots to compare")
            return
        
        snap1 = self.snapshots[snapshot1_idx]
        snap2 = self.snapshots[snapshot2_idx]
        
        print(f"\nComparing snapshots:")
        print(f"  {snap1['name']} vs {snap2['name']}")
        
        # 比较堆信息
        self._compare_heap_info(snap1['heap_info'], snap2['heap_info'])
        
        # 比较句柄信息
        self._compare_handle_info(snap1['handle_info'], snap2['handle_info'])
        
        # 比较模块信息
        self._compare_module_info(snap1['module_info'], snap2['module_info'])
    
    def _compare_heap_info(self, heap1, heap2):
        """比较堆信息"""
        print("\nHeap Comparison:")
        
        committed_diff = heap2['total_committed'] - heap1['total_committed']
        reserved_diff = heap2['total_reserved'] - heap1['total_reserved']
        
        print(f"  Committed memory change: {committed_diff:+,} bytes")
        print(f"  Reserved memory change: {reserved_diff:+,} bytes")
        
        if committed_diff > 1024 * 1024:  # 1MB
            print(f"  WARNING: Large increase in committed memory!")
            self.leak_candidates.append({
                'type': 'heap_growth',
                'size': committed_diff,
                'description': f"Heap committed memory increased by {committed_diff:,} bytes"
            })
    
    def _compare_handle_info(self, handle1, handle2):
        """比较句柄信息"""
        print("\nHandle Comparison:")
        
        handle_diff = handle2['total_handles'] - handle1['total_handles']
        print(f"  Total handle change: {handle_diff:+}")
        
        if handle_diff > 100:
            print(f"  WARNING: Large increase in handle count!")
            self.leak_candidates.append({
                'type': 'handle_leak',
                'count': handle_diff,
                'description': f"Handle count increased by {handle_diff}"
            })
        
        # 比较句柄类型
        for handle_type in handle2['handle_types']:
            count1 = handle1['handle_types'].get(handle_type, 0)
            count2 = handle2['handle_types'][handle_type]
            diff = count2 - count1
            
            if diff > 0:
                print(f"  {handle_type}: +{diff}")
    
    def _compare_module_info(self, module1, module2):
        """比较模块信息"""
        print("\nModule Comparison:")
        
        size_diff = module2['total_size'] - module1['total_size']
        print(f"  Total module size change: {size_diff:+,} bytes")
        
        # 查找新加载的模块
        old_modules = {mod['name'] for mod in module1['modules']}
        new_modules = {mod['name'] for mod in module2['modules']}
        
        added_modules = new_modules - old_modules
        removed_modules = old_modules - new_modules
        
        if added_modules:
            print(f"  Added modules: {', '.join(added_modules)}")
        
        if removed_modules:
            print(f"  Removed modules: {', '.join(removed_modules)}")
    
    def generate_report(self):
        """生成泄漏报告"""
        print("\n=== Memory Leak Detection Report ===")
        
        if not self.leak_candidates:
            print("No memory leaks detected.")
            return
        
        print(f"Found {len(self.leak_candidates)} potential leak(s):")
        
        for i, leak in enumerate(self.leak_candidates, 1):
            print(f"\n{i}. {leak['type'].upper()}:")
            print(f"   {leak['description']}")
            
            if leak['type'] == 'heap_growth':
                print(f"   Recommendation: Check for memory allocations without corresponding frees")
            elif leak['type'] == 'handle_leak':
                print(f"   Recommendation: Check for unclosed handles (files, registry keys, etc.)")
    
    def save_snapshots(self, filename):
        """保存快照到文件"""
        try:
            with open(filename, 'w') as f:
                json.dump(self.snapshots, f, indent=2)
            print(f"Snapshots saved to {filename}")
        except Exception as e:
            print(f"Error saving snapshots: {e}")
    
    def load_snapshots(self, filename):
        """从文件加载快照"""
        try:
            with open(filename, 'r') as f:
                self.snapshots = json.load(f)
            print(f"Snapshots loaded from {filename}")
        except Exception as e:
            print(f"Error loading snapshots: {e}")

# 使用示例
if __name__ == "__main__":
    detector = MemoryLeakDetector()
    
    # 拍摄初始快照
    detector.take_snapshot("initial")
    
    print("\nPerform operations that might cause memory leaks...")
    print("Press Enter when ready to take second snapshot")
    input()
    
    # 拍摄第二个快照
    detector.take_snapshot("after_operations")
    
    # 比较快照
    detector.compare_snapshots()
    
    # 生成报告
    detector.generate_report()
    
    # 保存快照
    detector.save_snapshots("memory_snapshots.json")

5. 扩展部署和分发

5.1 扩展打包

传统扩展打包

@echo off
echo Packaging WinDbg Extension...

# 创建发布目录
mkdir release
mkdir release\x64
mkdir release\x86
mkdir release\docs

# 复制文件
copy x64\Release\myextension.dll release\x64\
copy x86\Release\myextension.dll release\x86\
copy README.md release\docs\
copy CHANGELOG.md release\docs\

# 创建安装脚本
echo Creating install script...
(
echo @echo off
echo echo Installing MyExtension...
echo.
echo # 检测WinDbg安装路径
echo for /f "tokens=2*" %%%%a in ^('reg query "HKLM\SOFTWARE\Microsoft\Windows Kits\Installed Roots" /v KitsRoot10 2^>nul'^) do set KITS_ROOT=%%%%b
echo.
echo if not defined KITS_ROOT ^(
echo     echo Windows SDK not found!
echo     pause
echo     exit /b 1
echo ^\)
echo.
echo # 复制扩展文件
echo copy x64\myextension.dll "%%KITS_ROOT%%Debuggers\x64\"
echo copy x86\myextension.dll "%%KITS_ROOT%%Debuggers\x86\"
echo.
echo echo Installation complete!
echo pause
) > release\install.bat

echo Package created in release\ directory
pause

JavaScript扩展打包

// package.js - 扩展打包脚本
const fs = require('fs');
const path = require('path');
const archiver = require('archiver');

function packageExtension() {
    const packageInfo = {
        name: "MyJSExtension",
        version: "1.0.0",
        description: "Advanced debugging extension for WinDbg",
        author: "Your Name",
        files: [
            "myextension.js",
            "utils.js",
            "README.md",
            "LICENSE"
        ]
    };
    
    // 创建package.json
    fs.writeFileSync('package.json', JSON.stringify(packageInfo, null, 2));
    
    // 创建ZIP包
    const output = fs.createWriteStream('myextension-1.0.0.zip');
    const archive = archiver('zip', { zlib: { level: 9 } });
    
    output.on('close', () => {
        console.log(`Package created: ${archive.pointer()} bytes`);
    });
    
    archive.on('error', (err) => {
        throw err;
    });
    
    archive.pipe(output);
    
    // 添加文件
    packageInfo.files.forEach(file => {
        if (fs.existsSync(file)) {
            archive.file(file, { name: file });
        }
    });
    
    archive.finalize();
}

packageExtension();

5.2 扩展安装

自动安装脚本

# install_extension.ps1
param(
    [Parameter(Mandatory=$true)]
    [string]$ExtensionPath,
    
    [Parameter(Mandatory=$false)]
    [string]$Architecture = "x64"
)

Write-Host "Installing WinDbg Extension..." -ForegroundColor Green

# 查找WinDbg安装路径
$windbgPaths = @(
    "${env:ProgramFiles(x86)}\Windows Kits\10\Debuggers\$Architecture",
    "${env:ProgramFiles}\Windows Kits\10\Debuggers\$Architecture",
    "${env:ProgramFiles(x86)}\Windows Kits\8.1\Debuggers\$Architecture",
    "${env:ProgramFiles}\Windows Kits\8.1\Debuggers\$Architecture"
)

$windbgPath = $null
foreach ($path in $windbgPaths) {
    if (Test-Path "$path\windbg.exe") {
        $windbgPath = $path
        break
    }
}

if (-not $windbgPath) {
    Write-Error "WinDbg not found. Please install Windows SDK."
    exit 1
}

Write-Host "Found WinDbg at: $windbgPath" -ForegroundColor Yellow

# 复制扩展文件
try {
    if (Test-Path $ExtensionPath) {
        $fileName = Split-Path $ExtensionPath -Leaf
        $destination = Join-Path $windbgPath $fileName
        
        Copy-Item $ExtensionPath $destination -Force
        Write-Host "Extension installed: $destination" -ForegroundColor Green
        
        # 验证安装
        if (Test-Path $destination) {
            Write-Host "Installation verified successfully!" -ForegroundColor Green
        } else {
            Write-Error "Installation verification failed!"
        }
    } else {
        Write-Error "Extension file not found: $ExtensionPath"
    }
} catch {
    Write-Error "Installation failed: $($_.Exception.Message)"
}

Write-Host "Installation complete!" -ForegroundColor Green

5.3 扩展管理

扩展管理器

# extension_manager.py
import os
import json
import shutil
import winreg
from pathlib import Path

class WinDbgExtensionManager:
    def __init__(self):
        self.windbg_paths = self._find_windbg_paths()
        self.extensions_config = self._load_extensions_config()
    
    def _find_windbg_paths(self):
        """查找WinDbg安装路径"""
        paths = []
        
        try:
            # 从注册表查找
            with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, 
                               r"SOFTWARE\Microsoft\Windows Kits\Installed Roots") as key:
                kits_root, _ = winreg.QueryValueEx(key, "KitsRoot10")
                
                for arch in ["x64", "x86"]:
                    debugger_path = os.path.join(kits_root, "Debuggers", arch)
                    if os.path.exists(os.path.join(debugger_path, "windbg.exe")):
                        paths.append({
                            'path': debugger_path,
                            'architecture': arch,
                            'version': self._get_windbg_version(debugger_path)
                        })
                        
        except Exception as e:
            print(f"Error finding WinDbg paths: {e}")
        
        return paths
    
    def _get_windbg_version(self, path):
        """获取WinDbg版本"""
        try:
            windbg_exe = os.path.join(path, "windbg.exe")
            if os.path.exists(windbg_exe):
                # 简化版本检测
                return "10.0"
        except:
            pass
        return "Unknown"
    
    def _load_extensions_config(self):
        """加载扩展配置"""
        config_file = "extensions.json"
        if os.path.exists(config_file):
            try:
                with open(config_file, 'r') as f:
                    return json.load(f)
            except Exception as e:
                print(f"Error loading config: {e}")
        
        return {'extensions': []}
    
    def _save_extensions_config(self):
        """保存扩展配置"""
        try:
            with open("extensions.json", 'w') as f:
                json.dump(self.extensions_config, f, indent=2)
        except Exception as e:
            print(f"Error saving config: {e}")
    
    def install_extension(self, extension_path, name=None, description=None):
        """安装扩展"""
        if not os.path.exists(extension_path):
            print(f"Extension file not found: {extension_path}")
            return False
        
        extension_name = name or os.path.splitext(os.path.basename(extension_path))[0]
        
        print(f"Installing extension: {extension_name}")
        
        success_count = 0
        for windbg_info in self.windbg_paths:
            try:
                dest_path = os.path.join(windbg_info['path'], os.path.basename(extension_path))
                shutil.copy2(extension_path, dest_path)
                print(f"  Installed to {windbg_info['architecture']}: {dest_path}")
                success_count += 1
            except Exception as e:
                print(f"  Failed to install to {windbg_info['architecture']}: {e}")
        
        if success_count > 0:
            # 更新配置
            extension_info = {
                'name': extension_name,
                'file': os.path.basename(extension_path),
                'description': description or f"Extension {extension_name}",
                'installed_paths': [info['path'] for info in self.windbg_paths],
                'type': self._detect_extension_type(extension_path)
            }
            
            self.extensions_config['extensions'].append(extension_info)
            self._save_extensions_config()
            
            print(f"Extension {extension_name} installed successfully!")
            return True
        
        return False
    
    def _detect_extension_type(self, extension_path):
        """检测扩展类型"""
        ext = os.path.splitext(extension_path)[1].lower()
        if ext == '.dll':
            return 'native'
        elif ext == '.js':
            return 'javascript'
        elif ext == '.py':
            return 'python'
        else:
            return 'unknown'
    
    def uninstall_extension(self, extension_name):
        """卸载扩展"""
        extension_info = None
        for ext in self.extensions_config['extensions']:
            if ext['name'] == extension_name:
                extension_info = ext
                break
        
        if not extension_info:
            print(f"Extension {extension_name} not found")
            return False
        
        print(f"Uninstalling extension: {extension_name}")
        
        success_count = 0
        for path in extension_info['installed_paths']:
            try:
                file_path = os.path.join(path, extension_info['file'])
                if os.path.exists(file_path):
                    os.remove(file_path)
                    print(f"  Removed from: {path}")
                    success_count += 1
            except Exception as e:
                print(f"  Failed to remove from {path}: {e}")
        
        if success_count > 0:
            # 从配置中移除
            self.extensions_config['extensions'].remove(extension_info)
            self._save_extensions_config()
            
            print(f"Extension {extension_name} uninstalled successfully!")
            return True
        
        return False
    
    def list_extensions(self):
        """列出已安装的扩展"""
        print("Installed Extensions:")
        
        if not self.extensions_config['extensions']:
            print("  No extensions installed")
            return
        
        for ext in self.extensions_config['extensions']:
            print(f"\n  Name: {ext['name']}")
            print(f"  Type: {ext['type']}")
            print(f"  File: {ext['file']}")
            print(f"  Description: {ext['description']}")
            print(f"  Installed paths: {len(ext['installed_paths'])}")
    
    def verify_installations(self):
        """验证扩展安装"""
        print("Verifying extension installations...")
        
        for ext in self.extensions_config['extensions']:
            print(f"\nVerifying {ext['name']}:")
            
            for path in ext['installed_paths']:
                file_path = os.path.join(path, ext['file'])
                if os.path.exists(file_path):
                    print(f"  ✓ {path}")
                else:
                    print(f"  ✗ {path} (missing)")

# 使用示例
if __name__ == "__main__":
    manager = WinDbgExtensionManager()
    
    print("WinDbg Extension Manager")
    print("========================")
    
    # 显示WinDbg安装信息
    print(f"\nFound {len(manager.windbg_paths)} WinDbg installation(s):")
    for info in manager.windbg_paths:
        print(f"  {info['architecture']}: {info['path']} (v{info['version']})")
    
    # 列出已安装的扩展
    manager.list_extensions()
    
    # 验证安装
    manager.verify_installations()

6. 扩展调试和测试

6.1 扩展调试技巧

调试输出

// 调试宏定义
#ifdef _DEBUG
#define DBG_PRINT(fmt, ...) dprintf("[DEBUG] " fmt, __VA_ARGS__)
#define DBG_ENTER() dprintf("[DEBUG] Entering %s\n", __FUNCTION__)
#define DBG_EXIT() dprintf("[DEBUG] Exiting %s\n", __FUNCTION__)
#else
#define DBG_PRINT(fmt, ...)
#define DBG_ENTER()
#define DBG_EXIT()
#endif

// 使用示例
DECLARE_API(debug_example)
{
    DBG_ENTER();
    
    DBG_PRINT("Processing arguments: %s\n", args ? args : "(none)");
    
    // 扩展逻辑
    if (args && *args)
    {
        ULONG64 address = GetExpression(args);
        DBG_PRINT("Parsed address: 0x%I64x\n", address);
        
        if (address != 0)
        {
            // 处理地址
            dprintf("Processing address 0x%I64x\n", address);
        }
        else
        {
            DBG_PRINT("Invalid address expression: %s\n", args);
            dprintf("Error: Invalid address\n");
        }
    }
    
    DBG_EXIT();
}

错误处理

// 错误处理宏
#define SAFE_CALL(func, ...) \
    do { \
        HRESULT hr = func(__VA_ARGS__); \
        if (FAILED(hr)) { \
            dprintf("Error in %s: 0x%08x\n", #func, hr); \
            return; \
        } \
    } while(0)

#define SAFE_READ_MEMORY(addr, buffer, size) \
    do { \
        ULONG bytesRead; \
        if (!ReadMemory(addr, buffer, size, &bytesRead) || bytesRead != size) { \
            dprintf("Failed to read memory at 0x%I64x\n", addr); \
            return; \
        } \
    } while(0)

// 使用示例
DECLARE_API(safe_example)
{
    if (!args || !*args)
    {
        dprintf("Usage: !safe_example <address>\n");
        return;
    }
    
    ULONG64 address = GetExpression(args);
    if (address == 0)
    {
        dprintf("Error: Invalid address expression\n");
        return;
    }
    
    // 安全读取内存
    UCHAR buffer[16];
    SAFE_READ_MEMORY(address, buffer, sizeof(buffer));
    
    // 处理数据
    dprintf("Data at 0x%I64x: ", address);
    for (int i = 0; i < sizeof(buffer); i++)
    {
        dprintf("%02x ", buffer[i]);
    }
    dprintf("\n");
}

6.2 单元测试

扩展测试框架

// test_framework.h
#pragma once

#include <vector>
#include <string>
#include <functional>

class ExtensionTestFramework
{
public:
    struct TestCase
    {
        std::string name;
        std::function<bool()> test_func;
        std::string description;
    };
    
    static ExtensionTestFramework& Instance()
    {
        static ExtensionTestFramework instance;
        return instance;
    }
    
    void RegisterTest(const std::string& name, std::function<bool()> test_func, const std::string& description = "")
    {
        tests_.push_back({name, test_func, description});
    }
    
    void RunAllTests()
    {
        dprintf("Running %d test(s)...\n", static_cast<int>(tests_.size()));
        
        int passed = 0;
        int failed = 0;
        
        for (const auto& test : tests_)
        {
            dprintf("\nRunning test: %s\n", test.name.c_str());
            if (!test.description.empty())
            {
                dprintf("Description: %s\n", test.description.c_str());
            }
            
            try
            {
                if (test.test_func())
                {
                    dprintf("PASSED: %s\n", test.name.c_str());
                    passed++;
                }
                else
                {
                    dprintf("FAILED: %s\n", test.name.c_str());
                    failed++;
                }
            }
            catch (...)
            {
                dprintf("EXCEPTION: %s\n", test.name.c_str());
                failed++;
            }
        }
        
        dprintf("\nTest Results: %d passed, %d failed\n", passed, failed);
    }
    
private:
    std::vector<TestCase> tests_;
};

// 测试宏
#define REGISTER_TEST(name, func, desc) \
    ExtensionTestFramework::Instance().RegisterTest(name, func, desc)

#define ASSERT_TRUE(condition) \
    do { \
        if (!(condition)) { \
            dprintf("Assertion failed: %s\n", #condition); \
            return false; \
        } \
    } while(0)

#define ASSERT_EQ(expected, actual) \
    do { \
        if ((expected) != (actual)) { \
            dprintf("Assertion failed: expected %s, got %s\n", #expected, #actual); \
            return false; \
        } \
    } while(0)

测试用例实现

// tests.cpp
#include "test_framework.h"

// 测试内存读取功能
bool TestMemoryRead()
{
    // 获取一个已知的有效地址
    ULONG64 address = GetExpression("@eip");
    ASSERT_TRUE(address != 0);
    
    // 尝试读取内存
    UCHAR buffer[4];
    ULONG bytesRead;
    BOOL result = ReadMemory(address, buffer, sizeof(buffer), &bytesRead);
    
    ASSERT_TRUE(result);
    ASSERT_EQ(sizeof(buffer), bytesRead);
    
    return true;
}

// 测试符号解析功能
bool TestSymbolResolution()
{
    // 获取当前指令地址
    ULONG64 address = GetExpression("@eip");
    ASSERT_TRUE(address != 0);
    
    // 尝试解析符号
    CHAR symbolName[256];
    ULONG64 displacement;
    
    if (ResolveSymbol(address, symbolName, sizeof(symbolName), &displacement))
    {
        dprintf("Symbol: %s+0x%I64x\n", symbolName, displacement);
        return true;
    }
    
    // 符号解析失败不一定是错误(可能没有符号)
    dprintf("No symbol found for address 0x%I64x\n", address);
    return true;
}

// 测试表达式求值
bool TestExpressionEvaluation()
{
    // 测试简单表达式
    ULONG64 result1 = GetExpression("1+1");
    ASSERT_EQ(2, result1);
    
    // 测试十六进制
    ULONG64 result2 = GetExpression("0x10");
    ASSERT_EQ(16, result2);
    
    // 测试寄存器
    ULONG64 eip = GetExpression("@eip");
    ASSERT_TRUE(eip != 0);
    
    return true;
}

// 注册所有测试
void RegisterAllTests()
{
    REGISTER_TEST("MemoryRead", TestMemoryRead, "Test memory reading functionality");
    REGISTER_TEST("SymbolResolution", TestSymbolResolution, "Test symbol resolution");
    REGISTER_TEST("ExpressionEvaluation", TestExpressionEvaluation, "Test expression evaluation");
}

// 运行测试的扩展命令
DECLARE_API(runtests)
{
    dprintf("=== Extension Unit Tests ===\n");
    
    RegisterAllTests();
    ExtensionTestFramework::Instance().RunAllTests();
}

6.3 性能测试

性能测量工具

// performance.h
#pragma once

#include <windows.h>

class PerformanceTimer
{
public:
    PerformanceTimer()
    {
        QueryPerformanceFrequency(&frequency_);
        Reset();
    }
    
    void Reset()
    {
        QueryPerformanceCounter(&start_time_);
    }
    
    double ElapsedMilliseconds()
    {
        LARGE_INTEGER end_time;
        QueryPerformanceCounter(&end_time);
        
        double elapsed = static_cast<double>(end_time.QuadPart - start_time_.QuadPart);
        return (elapsed * 1000.0) / frequency_.QuadPart;
    }
    
    double ElapsedMicroseconds()
    {
        return ElapsedMilliseconds() * 1000.0;
    }
    
private:
    LARGE_INTEGER frequency_;
    LARGE_INTEGER start_time_;
};

// 性能测试宏
#define PERF_TEST_BEGIN(name) \
    do { \
        dprintf("Performance test: %s\n", name); \
        PerformanceTimer timer; \
        timer.Reset();

#define PERF_TEST_END() \
        double elapsed = timer.ElapsedMilliseconds(); \
        dprintf("Elapsed time: %.3f ms\n", elapsed); \
    } while(0)

性能测试示例

// 测试内存读取性能
DECLARE_API(perftest)
{
    dprintf("=== Performance Tests ===\n");
    
    // 测试内存读取性能
    PERF_TEST_BEGIN("Memory Read (1000 iterations)");
    {
        ULONG64 address = GetExpression("@eip");
        UCHAR buffer[16];
        
        for (int i = 0; i < 1000; i++)
        {
            ReadMemory(address, buffer, sizeof(buffer), NULL);
        }
    }
    PERF_TEST_END();
    
    // 测试符号解析性能
    PERF_TEST_BEGIN("Symbol Resolution (100 iterations)");
    {
        ULONG64 address = GetExpression("@eip");
        CHAR symbolName[256];
        ULONG64 displacement;
        
        for (int i = 0; i < 100; i++)
        {
            ResolveSymbol(address, symbolName, sizeof(symbolName), &displacement);
        }
    }
    PERF_TEST_END();
    
    // 测试表达式求值性能
    PERF_TEST_BEGIN("Expression Evaluation (1000 iterations)");
    {
        for (int i = 0; i < 1000; i++)
        {
            GetExpression("@eip");
        }
    }
    PERF_TEST_END();
}

7. 最佳实践和注意事项

7.1 开发最佳实践

代码规范

// 1. 使用一致的命名约定
// 函数名:PascalCase
// 变量名:camelCase
// 常量:UPPER_CASE
// 扩展命令:lowercase

// 2. 添加详细注释
/**
 * @brief 分析进程堆信息
 * @param args 命令行参数,可选的堆地址
 * @return 无返回值
 * @note 如果未提供参数,分析所有堆
 */
DECLARE_API(analyzeheap)
{
    // 实现代码
}

// 3. 使用错误处理
// 总是检查API调用的返回值
// 提供有意义的错误消息
// 优雅地处理异常情况

// 4. 内存管理
// 及时释放分配的内存
// 避免内存泄漏
// 使用RAII模式

安全考虑

// 1. 输入验证
DECLARE_API(secure_example)
{
    // 检查参数
    if (!args || !*args)
    {
        dprintf("Usage: !secure_example <address>\n");
        return;
    }
    
    // 验证地址范围
    ULONG64 address = GetExpression(args);
    if (address == 0 || address == BADADDR)
    {
        dprintf("Error: Invalid address\n");
        return;
    }
    
    // 检查内存可访问性
    if (!IsAddressValid(address))
    {
        dprintf("Error: Address not accessible\n");
        return;
    }
    
    // 安全处理
}

// 2. 缓冲区保护
void SafeStringCopy(char* dest, size_t destSize, const char* src)
{
    if (dest && src && destSize > 0)
    {
        strncpy_s(dest, destSize, src, _TRUNCATE);
    }
}

// 3. 异常处理
DECLARE_API(exception_safe)
{
    __try
    {
        // 可能抛出异常的代码
        ULONG64 address = GetExpression(args);
        UCHAR buffer[16];
        ReadMemory(address, buffer, sizeof(buffer), NULL);
    }
    __except(EXCEPTION_EXECUTE_HANDLER)
    {
        dprintf("Exception occurred during operation\n");
    }
}

7.2 性能优化

缓存策略

// 符号缓存
class SymbolCache
{
private:
    std::map<ULONG64, std::string> cache_;
    static const size_t MAX_CACHE_SIZE = 1000;
    
public:
    bool GetSymbol(ULONG64 address, std::string& symbol)
    {
        auto it = cache_.find(address);
        if (it != cache_.end())
        {
            symbol = it->second;
            return true;
        }
        
        // 查找符号
        CHAR symbolName[256];
        ULONG64 displacement;
        
        if (ResolveSymbol(address, symbolName, sizeof(symbolName), &displacement))
        {
            symbol = std::string(symbolName) + "+0x" + std::to_string(displacement);
            
            // 添加到缓存
            if (cache_.size() < MAX_CACHE_SIZE)
            {
                cache_[address] = symbol;
            }
            
            return true;
        }
        
        return false;
    }
    
    void ClearCache()
    {
        cache_.clear();
    }
};

// 全局符号缓存
static SymbolCache g_symbolCache;

批量操作优化

// 批量内存读取
class BatchMemoryReader
{
public:
    struct ReadRequest
    {
        ULONG64 address;
        PVOID buffer;
        ULONG size;
        PULONG bytesRead;
    };
    
    bool BatchRead(const std::vector<ReadRequest>& requests)
    {
        bool allSuccess = true;
        
        for (const auto& req : requests)
        {
            if (!ReadMemory(req.address, req.buffer, req.size, req.bytesRead))
            {
                allSuccess = false;
                if (req.bytesRead)
                {
                    *req.bytesRead = 0;
                }
            }
        }
        
        return allSuccess;
    }
};

7.3 兼容性考虑

版本兼容性

// 检查WinDbg版本
bool CheckWinDbgVersion(USHORT requiredMajor, USHORT requiredMinor)
{
    LPEXT_API_VERSION apiVersion = ExtensionApiVersion();
    
    if (apiVersion->MajorVersion > requiredMajor ||
        (apiVersion->MajorVersion == requiredMajor && apiVersion->MinorVersion >= requiredMinor))
    {
        return true;
    }
    
    dprintf("Warning: This extension requires WinDbg version %d.%d or higher\n",
           requiredMajor, requiredMinor);
    return false;
}

// 条件功能
DECLARE_API(version_aware)
{
    if (CheckWinDbgVersion(10, 0))
    {
        // 使用新功能
        dprintf("Using advanced features\n");
    }
    else
    {
        // 使用兼容功能
        dprintf("Using compatibility mode\n");
    }
}

架构兼容性

// 检查目标架构
bool IsTarget64Bit()
{
    return (ExtensionApis.lpGetExpression("@$ptrsize") == 8);
}

DECLARE_API(arch_aware)
{
    if (IsTarget64Bit())
    {
        dprintf("Target is 64-bit\n");
        // 64位特定处理
    }
    else
    {
        dprintf("Target is 32-bit\n");
        // 32位特定处理
    }
}

8. 总结

WinDbg扩展开发是一个强大的调试增强技术,通过本章的学习,您应该掌握:

8.1 核心技能

  • 传统C/C++扩展开发
  • JavaScript扩展开发
  • Python扩展开发
  • 扩展部署和管理

8.2 最佳实践

  • 代码规范和安全考虑
  • 性能优化技巧
  • 兼容性处理
  • 测试和调试方法

8.3 进阶方向

  • 复杂数据结构可视化
  • 自动化分析工具
  • 企业级扩展管理
  • 与其他调试工具集成

通过扩展开发,您可以大大提高调试效率,创建专门针对特定应用或问题域的调试工具,使WinDbg成为更加强大和个性化的调试平台。