1. WinDbg扩展概述
1.1 扩展类型
WinDbg支持多种类型的扩展,每种都有其特定的用途和优势:
传统扩展(Legacy Extensions)
- 特点:基于C/C++开发,使用WinDbg SDK
- 优势:性能高,功能强大,访问底层API
- 适用场景:复杂的调试分析,性能敏感的操作
JavaScript扩展
- 特点:基于JavaScript,使用数据模型API
- 优势:开发简单,调试方便,动态加载
- 适用场景:快速原型开发,数据分析脚本
Python扩展
- 特点:基于Python,通过pykd模块
- 优势:语法简洁,库丰富,易于维护
- 适用场景:数据处理,自动化分析
1.2 扩展架构
WinDbg扩展架构图
┌─────────────────────────────────────────────────────────┐
│ WinDbg Engine │
├─────────────────────────────────────────────────────────┤
│ Extension Manager │
├─────────────────┬─────────────────┬─────────────────────┤
│ │ │ │
│ Legacy │ JavaScript │ Python │
│ Extensions │ Extensions │ Extensions │
│ (.dll) │ (.js) │ (.py) │
│ │ │ │
├─────────────────┼─────────────────┼─────────────────────┤
│ │ │ │
│ WinDbg SDK │ Data Model │ PyKD │
│ APIs │ APIs │ Module │
│ │ │ │
└─────────────────┴─────────────────┴─────────────────────┘
扩展加载机制
# 加载扩展
.load <扩展路径> # 加载传统扩展
.scriptload <脚本路径> # 加载JavaScript扩展
!py <Python命令> # 执行Python扩展
# 卸载扩展
.unload <扩展名> # 卸载传统扩展
.scriptunload <脚本名> # 卸载JavaScript扩展
1.3 开发环境准备
开发工具
# Visual Studio(推荐2019或更高版本)
- C++ 开发工具
- Windows SDK
- WinDbg SDK
# 替代工具
- Visual Studio Code
- CLion
- Qt Creator
SDK和库
# Windows SDK
- 包含调试API头文件
- 提供调试引擎接口
# WinDbg SDK
- 扩展开发专用SDK
- 包含示例代码
# 第三方库
- Boost(可选)
- STL(标准库)
2. 传统扩展开发
2.1 开发环境搭建
项目创建
// 创建新的DLL项目
// 项目设置:
// - 配置类型:动态库(.dll)
// - 字符集:Unicode
// - 平台工具集:v142或更高
// 包含必要的头文件
#include <windows.h>
#include <dbgeng.h>
#include <dbghelp.h>
#include <wdbgexts.h>
// 链接必要的库
#pragma comment(lib, "dbgeng.lib")
#pragma comment(lib, "dbghelp.lib")
基本框架
// extension.cpp
#include "stdafx.h"
// 全局变量
WINDBG_EXTENSION_APIS ExtensionApis;
HANDLE hCurrentProcess;
HANDLE hCurrentThread;
DWORD dwCurrentPc;
DWORD dwProcessor;
HMODULE hModule;
// DLL入口点
BOOL APIENTRY DllMain(HMODULE hModule, DWORD ul_reason_for_call, LPVOID lpReserved)
{
switch (ul_reason_for_call)
{
case DLL_PROCESS_ATTACH:
::hModule = hModule;
break;
case DLL_THREAD_ATTACH:
case DLL_THREAD_DETACH:
case DLL_PROCESS_DETACH:
break;
}
return TRUE;
}
// 扩展初始化
VOID WinDbgExtensionDllInit(PWINDBG_EXTENSION_APIS lpExtensionApis, USHORT MajorVersion, USHORT MinorVersion)
{
ExtensionApis = *lpExtensionApis;
// 保存调试器信息
hCurrentProcess = ExtensionApis.lpGetCurrentProcessHandle();
hCurrentThread = ExtensionApis.lpGetCurrentThreadHandle();
dwCurrentPc = ExtensionApis.lpGetExpression("@eip");
dwProcessor = ExtensionApis.lpGetExpression("@$proc");
dprintf("Extension loaded successfully\n");
}
// 扩展版本信息
LPEXT_API_VERSION ExtensionApiVersion(void)
{
static EXT_API_VERSION ApiVersion =
{
5, // MajorVersion
2, // MinorVersion
EXT_API_VERSION_NUMBER64,
0 // Reserved
};
return &ApiVersion;
}
// 扩展清理
VOID CheckVersion(void)
{
// 版本检查代码
}
2.2 基本扩展命令
简单命令实现
// 声明扩展命令
DECLARE_API(hello)
{
dprintf("Hello from WinDbg extension!\n");
dprintf("Current process: 0x%p\n", hCurrentProcess);
dprintf("Current thread: 0x%p\n", hCurrentThread);
dprintf("Current PC: 0x%08x\n", dwCurrentPc);
}
// 带参数的命令
DECLARE_API(showmem)
{
ULONG64 address = 0;
ULONG size = 16;
// 解析参数
if (args && *args)
{
address = GetExpression(args);
// 查找大小参数
PSTR sizeArg = strchr(args, ' ');
if (sizeArg)
{
size = (ULONG)GetExpression(sizeArg + 1);
}
}
if (address == 0)
{
dprintf("Usage: !showmem <address> [size]\n");
return;
}
// 读取内存
UCHAR buffer[256];
ULONG bytesRead;
if (ReadMemory(address, buffer, min(size, sizeof(buffer)), &bytesRead))
{
dprintf("Memory at 0x%I64x:\n", address);
for (ULONG i = 0; i < bytesRead; i += 16)
{
dprintf("%08I64x: ", address + i);
// 十六进制显示
for (ULONG j = 0; j < 16 && (i + j) < bytesRead; j++)
{
dprintf("%02x ", buffer[i + j]);
}
// ASCII显示
dprintf(" ");
for (ULONG j = 0; j < 16 && (i + j) < bytesRead; j++)
{
UCHAR c = buffer[i + j];
dprintf("%c", (c >= 32 && c <= 126) ? c : '.');
}
dprintf("\n");
}
}
else
{
dprintf("Failed to read memory at 0x%I64x\n", address);
}
}
复杂命令实现
// 堆栈分析命令
DECLARE_API(analyzestack)
{
ULONG64 stackBase, stackLimit;
ULONG64 currentSp;
// 获取堆栈信息
if (GetCurrentThreadStackLimits(&stackLimit, &stackBase) != S_OK)
{
dprintf("Failed to get stack limits\n");
return;
}
currentSp = GetExpression("@esp");
dprintf("Stack Analysis:\n");
dprintf("Stack Base: 0x%I64x\n", stackBase);
dprintf("Stack Limit: 0x%I64x\n", stackLimit);
dprintf("Current SP: 0x%I64x\n", currentSp);
dprintf("Stack Size: %I64d bytes\n", stackBase - stackLimit);
dprintf("Used Stack: %I64d bytes\n", stackBase - currentSp);
// 分析堆栈内容
ULONG64 addr = currentSp;
while (addr < stackBase)
{
ULONG64 value;
if (ReadMemory(addr, &value, sizeof(value), NULL))
{
// 检查是否是有效的代码地址
CHAR symbolName[256];
ULONG64 displacement;
if (GetSymbol(value, symbolName, &displacement) == S_OK)
{
dprintf("0x%I64x: 0x%I64x -> %s+0x%I64x\n",
addr, value, symbolName, displacement);
}
}
addr += sizeof(ULONG64);
}
}
2.3 高级功能实现
符号解析
// 符号解析辅助函数
BOOL ResolveSymbol(ULONG64 address, PSTR symbolName, ULONG nameSize, PULONG64 displacement)
{
SYMBOL_INFO_PACKAGE sip;
sip.si.SizeOfStruct = sizeof(SYMBOL_INFO);
sip.si.MaxNameLen = MAX_SYM_NAME;
if (SymFromAddr(hCurrentProcess, address, displacement, &sip.si))
{
strncpy_s(symbolName, nameSize, sip.si.Name, _TRUNCATE);
return TRUE;
}
return FALSE;
}
// 模块信息获取
DECLARE_API(modinfo)
{
ULONG64 moduleBase = 0;
if (args && *args)
{
moduleBase = GetExpression(args);
}
else
{
moduleBase = GetExpression("@eip");
moduleBase = GetModuleBase(moduleBase);
}
if (moduleBase == 0)
{
dprintf("Invalid module address\n");
return;
}
// 获取模块信息
IMAGEHLP_MODULE64 moduleInfo;
moduleInfo.SizeOfStruct = sizeof(moduleInfo);
if (SymGetModuleInfo64(hCurrentProcess, moduleBase, &moduleInfo))
{
dprintf("Module Information:\n");
dprintf("Base Address: 0x%I64x\n", moduleInfo.BaseOfImage);
dprintf("Image Size: 0x%x\n", moduleInfo.ImageSize);
dprintf("Module Name: %s\n", moduleInfo.ModuleName);
dprintf("Image Name: %s\n", moduleInfo.ImageName);
dprintf("Loaded Image Name: %s\n", moduleInfo.LoadedImageName);
dprintf("Symbol Type: %d\n", moduleInfo.SymType);
dprintf("PDB File: %s\n", moduleInfo.CVData);
}
else
{
dprintf("Failed to get module information\n");
}
}
内存分析
// 内存区域分析
DECLARE_API(memregion)
{
ULONG64 address = 0;
if (args && *args)
{
address = GetExpression(args);
}
else
{
address = GetExpression("@eip");
}
MEMORY_BASIC_INFORMATION64 mbi;
if (VirtualQueryEx(hCurrentProcess, (LPCVOID)address, (PMEMORY_BASIC_INFORMATION)&mbi, sizeof(mbi)))
{
dprintf("Memory Region Information:\n");
dprintf("Base Address: 0x%I64x\n", (ULONG64)mbi.BaseAddress);
dprintf("Allocation Base: 0x%I64x\n", (ULONG64)mbi.AllocationBase);
dprintf("Region Size: 0x%I64x\n", mbi.RegionSize);
dprintf("State: ");
switch (mbi.State)
{
case MEM_COMMIT:
dprintf("Committed\n");
break;
case MEM_FREE:
dprintf("Free\n");
break;
case MEM_RESERVE:
dprintf("Reserved\n");
break;
default:
dprintf("Unknown (0x%x)\n", mbi.State);
break;
}
dprintf("Protection: ");
switch (mbi.Protect)
{
case PAGE_EXECUTE:
dprintf("Execute\n");
break;
case PAGE_EXECUTE_READ:
dprintf("Execute/Read\n");
break;
case PAGE_EXECUTE_READWRITE:
dprintf("Execute/Read/Write\n");
break;
case PAGE_READONLY:
dprintf("Read Only\n");
break;
case PAGE_READWRITE:
dprintf("Read/Write\n");
break;
default:
dprintf("Other (0x%x)\n", mbi.Protect);
break;
}
}
else
{
dprintf("Failed to query memory at 0x%I64x\n", address);
}
}
2.4 扩展编译和部署
项目配置
// stdafx.h
#pragma once
#include "targetver.h"
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// WinDbg扩展头文件
#include <wdbgexts.h>
#include <dbgeng.h>
#include <dbghelp.h>
// 链接库
#pragma comment(lib, "dbgeng.lib")
#pragma comment(lib, "dbghelp.lib")
// 扩展API声明宏
#define DECLARE_API(extension) \
CPPMOD bool CALLBACK extension(HANDLE hCurrentProcess, HANDLE hCurrentThread, \
ULONG dwCurrentPc, ULONG dwProcessor, PCSTR args)
模块定义文件
; extension.def
EXPORTS
WinDbgExtensionDllInit
ExtensionApiVersion
CheckVersion
hello
showmem
analyzestack
modinfo
memregion
编译脚本
@echo off
echo Building WinDbg Extension...
# 设置环境变量
set INCLUDE=%INCLUDE%;C:\Program Files (x86)\Windows Kits\10\Debuggers\inc
set LIB=%LIB%;C:\Program Files (x86)\Windows Kits\10\Debuggers\lib\x64
# 编译
cl /LD /MD extension.cpp /link /DEF:extension.def /OUT:myextension.dll
if %ERRORLEVEL% == 0 (
echo Build successful!
echo Extension: myextension.dll
) else (
echo Build failed!
)
pause
3. JavaScript扩展开发
3.1 JavaScript扩展基础
基本结构
// myextension.js
"use strict";
// 扩展初始化
function initializeScript() {
return [
new host.apiVersionSupport(1, 3),
new host.functionAlias(analyzeHeap, "analyzeheap"),
new host.functionAlias(showThreads, "showthreads"),
new host.functionAlias(findPattern, "findpattern")
];
}
// 扩展清理
function uninitializeScript() {
// 清理代码
}
// 堆分析函数
function analyzeHeap() {
var control = host.namespace.Debugger.Utility.Control;
host.diagnostics.debugLog("=== Heap Analysis ===\n");
try {
// 执行堆命令
var heapOutput = control.ExecuteCommand("!heap -s");
for (var line of heapOutput) {
host.diagnostics.debugLog(line + "\n");
}
// 分析堆统计
var heapStat = control.ExecuteCommand("!heap -stat -h 0");
host.diagnostics.debugLog("\n=== Heap Statistics ===\n");
for (var line of heapStat) {
host.diagnostics.debugLog(line + "\n");
}
} catch (e) {
host.diagnostics.debugLog("Error: " + e.message + "\n");
}
}
线程分析
// 线程分析函数
function showThreads() {
var control = host.namespace.Debugger.Utility.Control;
host.diagnostics.debugLog("=== Thread Analysis ===\n");
try {
// 获取所有线程
var threads = host.currentProcess.Threads;
for (var thread of threads) {
host.diagnostics.debugLog("Thread ID: " + thread.Id.toString(16) + "\n");
// 切换到线程
control.ExecuteCommand("~" + thread.Id.toString(16) + "s");
// 获取线程堆栈
var stack = control.ExecuteCommand("k");
for (var line of stack) {
host.diagnostics.debugLog(" " + line + "\n");
}
host.diagnostics.debugLog("\n");
}
} catch (e) {
host.diagnostics.debugLog("Error: " + e.message + "\n");
}
}
// 模式查找函数
function findPattern(pattern, startAddr, endAddr) {
if (!pattern) {
host.diagnostics.debugLog("Usage: !findpattern <pattern> [startAddr] [endAddr]\n");
return;
}
var control = host.namespace.Debugger.Utility.Control;
// 设置默认搜索范围
if (!startAddr) startAddr = "0";
if (!endAddr) endAddr = "L?80000000";
try {
var searchCmd = "s -b " + startAddr + " " + endAddr + " " + pattern;
host.diagnostics.debugLog("Searching: " + searchCmd + "\n");
var results = control.ExecuteCommand(searchCmd);
var count = 0;
for (var line of results) {
host.diagnostics.debugLog(line + "\n");
count++;
}
host.diagnostics.debugLog("Found " + count + " matches\n");
} catch (e) {
host.diagnostics.debugLog("Error: " + e.message + "\n");
}
}
3.2 数据模型扩展
对象模型扩展
// 进程扩展
class ProcessExtension {
constructor(process) {
this.__process = process;
}
// 获取堆信息
get HeapInfo() {
var control = host.namespace.Debugger.Utility.Control;
var output = control.ExecuteCommand("!heap -s");
var heaps = [];
for (var line of output) {
if (line.includes("Heap")) {
heaps.push(line.trim());
}
}
return heaps;
}
// 获取模块信息
get ModuleInfo() {
var modules = [];
for (var module of this.__process.Modules) {
modules.push({
Name: module.Name,
BaseAddress: module.BaseAddress,
Size: module.Size
});
}
return modules;
}
// 获取线程CPU时间
get ThreadCpuTime() {
var control = host.namespace.Debugger.Utility.Control;
var output = control.ExecuteCommand("!runaway");
var times = [];
for (var line of output) {
if (line.includes(":")) {
times.push(line.trim());
}
}
return times;
}
}
// 注册扩展
function initializeScript() {
return [
new host.apiVersionSupport(1, 3),
new host.namedModelParent(ProcessExtension, "Debugger.Models.Process")
];
}
自定义可视化
// 自定义数据可视化
class CustomVisualizer {
constructor(obj) {
this.__obj = obj;
}
toString() {
return "Custom Object at " + this.__obj.targetLocation.toString(16);
}
*[Symbol.iterator]() {
// 自定义迭代器
yield new host.indexedValue("Field1", this.__obj.field1);
yield new host.indexedValue("Field2", this.__obj.field2);
yield new host.indexedValue("Field3", this.__obj.field3);
}
}
// 注册可视化器
function initializeScript() {
return [
new host.apiVersionSupport(1, 3),
new host.typeSignatureRegistration(CustomVisualizer, "MyStruct")
];
}
3.3 高级JavaScript技巧
异步操作
// 异步分析函数
async function asyncAnalyze() {
var control = host.namespace.Debugger.Utility.Control;
host.diagnostics.debugLog("Starting async analysis...\n");
try {
// 模拟长时间运行的分析
for (var i = 0; i < 10; i++) {
host.diagnostics.debugLog("Step " + (i + 1) + "/10\n");
// 执行分析步骤
var output = control.ExecuteCommand("!heap -s");
// 处理输出
var lineCount = 0;
for (var line of output) {
lineCount++;
}
host.diagnostics.debugLog("Processed " + lineCount + " lines\n");
// 模拟延迟
await new Promise(resolve => setTimeout(resolve, 100));
}
host.diagnostics.debugLog("Async analysis complete\n");
} catch (e) {
host.diagnostics.debugLog("Error: " + e.message + "\n");
}
}
// 注册异步函数
function initializeScript() {
return [
new host.apiVersionSupport(1, 3),
new host.functionAlias(asyncAnalyze, "asyncanalyze")
];
}
错误处理和日志
// 日志记录类
class Logger {
constructor(logLevel = "INFO") {
this.logLevel = logLevel;
this.levels = { "DEBUG": 0, "INFO": 1, "WARN": 2, "ERROR": 3 };
}
log(level, message) {
if (this.levels[level] >= this.levels[this.logLevel]) {
var timestamp = new Date().toISOString();
host.diagnostics.debugLog("[" + timestamp + "] [" + level + "] " + message + "\n");
}
}
debug(message) { this.log("DEBUG", message); }
info(message) { this.log("INFO", message); }
warn(message) { this.log("WARN", message); }
error(message) { this.log("ERROR", message); }
}
// 全局日志实例
var logger = new Logger("DEBUG");
// 带错误处理的分析函数
function robustAnalyze() {
logger.info("Starting robust analysis");
try {
var control = host.namespace.Debugger.Utility.Control;
// 检查调试目标状态
if (!host.currentProcess) {
throw new Error("No current process");
}
logger.debug("Process ID: " + host.currentProcess.Id);
// 执行分析
var commands = ["!heap -s", "!handle", "lm"];
for (var cmd of commands) {
logger.debug("Executing: " + cmd);
try {
var output = control.ExecuteCommand(cmd);
var lineCount = 0;
for (var line of output) {
lineCount++;
}
logger.info("Command '" + cmd + "' returned " + lineCount + " lines");
} catch (cmdError) {
logger.error("Command '" + cmd + "' failed: " + cmdError.message);
}
}
logger.info("Robust analysis complete");
} catch (e) {
logger.error("Analysis failed: " + e.message);
logger.error("Stack trace: " + e.stack);
}
}
4. Python扩展开发
4.1 PyKD环境搭建
安装PyKD
# 安装PyKD模块
# 方法1:使用pip
pip install pykd
# 方法2:从源码编译
# 下载PyKD源码
# 使用Visual Studio编译
# 方法3:使用预编译版本
# 从GitHub下载预编译的pykd.pyd文件
基本配置
# pykd_config.py
import sys
import os
# 添加PyKD路径
sys.path.append(r"C:\pykd")
# 导入PyKD
try:
import pykd
print("PyKD loaded successfully")
except ImportError as e:
print(f"Failed to load PyKD: {e}")
sys.exit(1)
# 基本配置
pykd.initialize()
4.2 基本Python扩展
简单分析脚本
# heap_analyzer.py
import pykd
import re
class HeapAnalyzer:
def __init__(self):
self.heaps = []
self.total_committed = 0
self.total_reserved = 0
def analyze(self):
"""分析进程堆信息"""
print("=== Heap Analysis ===")
try:
# 执行堆命令
output = pykd.dbgCommand("!heap -s")
# 解析输出
self._parse_heap_output(output)
# 显示统计信息
self._show_statistics()
# 详细分析
self._detailed_analysis()
except Exception as e:
print(f"Error during heap analysis: {e}")
def _parse_heap_output(self, output):
"""解析堆输出"""
lines = output.split('\n')
for line in lines:
if 'Heap' in line and 'at' in line:
# 解析堆信息
match = re.search(r'Heap\s+(\w+)\s+at\s+(\w+)', line)
if match:
heap_id = match.group(1)
heap_addr = match.group(2)
self.heaps.append({
'id': heap_id,
'address': heap_addr,
'line': line.strip()
})
elif 'Total committed' in line:
match = re.search(r'Total committed\s+(\w+)', line)
if match:
self.total_committed = int(match.group(1), 16)
elif 'Total reserved' in line:
match = re.search(r'Total reserved\s+(\w+)', line)
if match:
self.total_reserved = int(match.group(1), 16)
def _show_statistics(self):
"""显示统计信息"""
print(f"\nHeap Statistics:")
print(f"Number of heaps: {len(self.heaps)}")
print(f"Total committed: 0x{self.total_committed:x} ({self.total_committed:,} bytes)")
print(f"Total reserved: 0x{self.total_reserved:x} ({self.total_reserved:,} bytes)")
if self.total_reserved > 0:
usage_percent = (self.total_committed / self.total_reserved) * 100
print(f"Heap usage: {usage_percent:.2f}%")
def _detailed_analysis(self):
"""详细分析每个堆"""
print(f"\nDetailed Heap Analysis:")
for heap in self.heaps:
print(f"\nAnalyzing heap {heap['id']} at {heap['address']}:")
try:
# 获取堆详细信息
cmd = f"!heap -stat -h {heap['address']}"
output = pykd.dbgCommand(cmd)
# 分析堆块分布
self._analyze_heap_blocks(output)
except Exception as e:
print(f" Error analyzing heap {heap['id']}: {e}")
def _analyze_heap_blocks(self, output):
"""分析堆块分布"""
lines = output.split('\n')
blocks = []
for line in lines:
if re.match(r'\s*\w+\s+\w+\s+\w+', line):
parts = line.split()
if len(parts) >= 3:
try:
size = int(parts[0], 16)
count = int(parts[1], 16)
total = int(parts[2], 16)
blocks.append({
'size': size,
'count': count,
'total': total
})
except ValueError:
continue
if blocks:
# 排序并显示最大的块
blocks.sort(key=lambda x: x['total'], reverse=True)
print(" Top heap block sizes:")
for i, block in enumerate(blocks[:5]):
print(f" {i+1}. Size: 0x{block['size']:x}, Count: {block['count']}, Total: 0x{block['total']:x}")
# 使用示例
if __name__ == "__main__":
analyzer = HeapAnalyzer()
analyzer.analyze()
线程分析脚本
# thread_analyzer.py
import pykd
import time
class ThreadAnalyzer:
def __init__(self):
self.threads = []
self.cpu_times = {}
def analyze_threads(self):
"""分析所有线程"""
print("=== Thread Analysis ===")
try:
# 获取线程列表
self._get_thread_list()
# 分析CPU使用
self._analyze_cpu_usage()
# 分析线程堆栈
self._analyze_thread_stacks()
# 检测死锁
self._detect_deadlocks()
except Exception as e:
print(f"Error during thread analysis: {e}")
def _get_thread_list(self):
"""获取线程列表"""
output = pykd.dbgCommand("~")
lines = output.split('\n')
for line in lines:
if line.strip() and not line.startswith('.'):
parts = line.split()
if len(parts) >= 2:
thread_id = parts[0].replace('~', '').replace('*', '')
if thread_id.isdigit():
self.threads.append({
'id': int(thread_id),
'line': line.strip()
})
print(f"Found {len(self.threads)} threads")
def _analyze_cpu_usage(self):
"""分析CPU使用情况"""
print("\nCPU Usage Analysis:")
try:
output = pykd.dbgCommand("!runaway")
lines = output.split('\n')
for line in lines:
if ':' in line and 'ms' in line:
parts = line.split(':')
if len(parts) >= 2:
thread_info = parts[0].strip()
time_info = parts[1].strip()
print(f" {thread_info}: {time_info}")
except Exception as e:
print(f" Error getting CPU usage: {e}")
def _analyze_thread_stacks(self):
"""分析线程堆栈"""
print("\nThread Stack Analysis:")
for thread in self.threads:
try:
print(f"\nThread {thread['id']}:")
# 切换到线程
pykd.dbgCommand(f"~{thread['id']}s")
# 获取堆栈
stack_output = pykd.dbgCommand("k")
stack_lines = stack_output.split('\n')
# 分析堆栈深度
stack_depth = len([line for line in stack_lines if line.strip() and not line.startswith('#')])
print(f" Stack depth: {stack_depth}")
# 显示前几层堆栈
print(" Top stack frames:")
for i, line in enumerate(stack_lines[:5]):
if line.strip() and not line.startswith('#'):
print(f" {line.strip()}")
# 检查是否在等待
self._check_thread_wait_state(thread['id'])
except Exception as e:
print(f" Error analyzing thread {thread['id']}: {e}")
def _check_thread_wait_state(self, thread_id):
"""检查线程等待状态"""
try:
# 获取线程详细信息
output = pykd.dbgCommand("!thread")
if "Waiting" in output:
print(f" Thread {thread_id} is in waiting state")
# 尝试获取等待对象信息
if "WaitForSingleObject" in output or "WaitForMultipleObjects" in output:
print(f" Thread {thread_id} is waiting for object(s)")
except Exception as e:
pass # 忽略错误
def _detect_deadlocks(self):
"""检测死锁"""
print("\nDeadlock Detection:")
try:
# 检查临界区
cs_output = pykd.dbgCommand("!cs -l")
if "No critical section" in cs_output:
print(" No locked critical sections found")
else:
print(" Locked critical sections detected:")
lines = cs_output.split('\n')
for line in lines:
if line.strip() and not line.startswith('!'):
print(f" {line.strip()}")
# 尝试自动死锁检测
deadlock_output = pykd.dbgCommand("!analyze -hang")
if "DEADLOCK" in deadlock_output.upper():
print(" Potential deadlock detected!")
except Exception as e:
print(f" Error during deadlock detection: {e}")
# 使用示例
if __name__ == "__main__":
analyzer = ThreadAnalyzer()
analyzer.analyze_threads()
4.3 高级Python扩展
内存泄漏检测器
# memory_leak_detector.py
import pykd
import time
import json
from collections import defaultdict
class MemoryLeakDetector:
def __init__(self):
self.snapshots = []
self.leak_candidates = []
def take_snapshot(self, name=""):
"""拍摄内存快照"""
print(f"Taking memory snapshot: {name}")
snapshot = {
'name': name or f"snapshot_{len(self.snapshots)}",
'timestamp': time.time(),
'heap_info': self._get_heap_info(),
'handle_info': self._get_handle_info(),
'module_info': self._get_module_info()
}
self.snapshots.append(snapshot)
print(f"Snapshot taken: {snapshot['name']}")
return snapshot
def _get_heap_info(self):
"""获取堆信息"""
heap_info = {
'heaps': [],
'total_committed': 0,
'total_reserved': 0
}
try:
output = pykd.dbgCommand("!heap -s")
lines = output.split('\n')
for line in lines:
if 'Heap' in line and 'at' in line:
heap_info['heaps'].append(line.strip())
elif 'Total committed' in line:
match = re.search(r'Total committed\s+(\w+)', line)
if match:
heap_info['total_committed'] = int(match.group(1), 16)
elif 'Total reserved' in line:
match = re.search(r'Total reserved\s+(\w+)', line)
if match:
heap_info['total_reserved'] = int(match.group(1), 16)
except Exception as e:
print(f"Error getting heap info: {e}")
return heap_info
def _get_handle_info(self):
"""获取句柄信息"""
handle_info = {
'total_handles': 0,
'handle_types': defaultdict(int)
}
try:
output = pykd.dbgCommand("!handle")
lines = output.split('\n')
for line in lines:
if 'Handle' in line and 'Type' in line:
parts = line.split()
if len(parts) >= 4:
handle_type = parts[3]
handle_info['handle_types'][handle_type] += 1
handle_info['total_handles'] += 1
except Exception as e:
print(f"Error getting handle info: {e}")
return handle_info
def _get_module_info(self):
"""获取模块信息"""
module_info = {
'modules': [],
'total_size': 0
}
try:
output = pykd.dbgCommand("lm")
lines = output.split('\n')
for line in lines:
if line.strip() and not line.startswith('start'):
parts = line.split()
if len(parts) >= 3:
start_addr = parts[0]
end_addr = parts[1]
module_name = parts[2]
try:
start = int(start_addr, 16)
end = int(end_addr, 16)
size = end - start
module_info['modules'].append({
'name': module_name,
'start': start,
'end': end,
'size': size
})
module_info['total_size'] += size
except ValueError:
continue
except Exception as e:
print(f"Error getting module info: {e}")
return module_info
def compare_snapshots(self, snapshot1_idx=0, snapshot2_idx=-1):
"""比较两个快照"""
if len(self.snapshots) < 2:
print("Need at least 2 snapshots to compare")
return
snap1 = self.snapshots[snapshot1_idx]
snap2 = self.snapshots[snapshot2_idx]
print(f"\nComparing snapshots:")
print(f" {snap1['name']} vs {snap2['name']}")
# 比较堆信息
self._compare_heap_info(snap1['heap_info'], snap2['heap_info'])
# 比较句柄信息
self._compare_handle_info(snap1['handle_info'], snap2['handle_info'])
# 比较模块信息
self._compare_module_info(snap1['module_info'], snap2['module_info'])
def _compare_heap_info(self, heap1, heap2):
"""比较堆信息"""
print("\nHeap Comparison:")
committed_diff = heap2['total_committed'] - heap1['total_committed']
reserved_diff = heap2['total_reserved'] - heap1['total_reserved']
print(f" Committed memory change: {committed_diff:+,} bytes")
print(f" Reserved memory change: {reserved_diff:+,} bytes")
if committed_diff > 1024 * 1024: # 1MB
print(f" WARNING: Large increase in committed memory!")
self.leak_candidates.append({
'type': 'heap_growth',
'size': committed_diff,
'description': f"Heap committed memory increased by {committed_diff:,} bytes"
})
def _compare_handle_info(self, handle1, handle2):
"""比较句柄信息"""
print("\nHandle Comparison:")
handle_diff = handle2['total_handles'] - handle1['total_handles']
print(f" Total handle change: {handle_diff:+}")
if handle_diff > 100:
print(f" WARNING: Large increase in handle count!")
self.leak_candidates.append({
'type': 'handle_leak',
'count': handle_diff,
'description': f"Handle count increased by {handle_diff}"
})
# 比较句柄类型
for handle_type in handle2['handle_types']:
count1 = handle1['handle_types'].get(handle_type, 0)
count2 = handle2['handle_types'][handle_type]
diff = count2 - count1
if diff > 0:
print(f" {handle_type}: +{diff}")
def _compare_module_info(self, module1, module2):
"""比较模块信息"""
print("\nModule Comparison:")
size_diff = module2['total_size'] - module1['total_size']
print(f" Total module size change: {size_diff:+,} bytes")
# 查找新加载的模块
old_modules = {mod['name'] for mod in module1['modules']}
new_modules = {mod['name'] for mod in module2['modules']}
added_modules = new_modules - old_modules
removed_modules = old_modules - new_modules
if added_modules:
print(f" Added modules: {', '.join(added_modules)}")
if removed_modules:
print(f" Removed modules: {', '.join(removed_modules)}")
def generate_report(self):
"""生成泄漏报告"""
print("\n=== Memory Leak Detection Report ===")
if not self.leak_candidates:
print("No memory leaks detected.")
return
print(f"Found {len(self.leak_candidates)} potential leak(s):")
for i, leak in enumerate(self.leak_candidates, 1):
print(f"\n{i}. {leak['type'].upper()}:")
print(f" {leak['description']}")
if leak['type'] == 'heap_growth':
print(f" Recommendation: Check for memory allocations without corresponding frees")
elif leak['type'] == 'handle_leak':
print(f" Recommendation: Check for unclosed handles (files, registry keys, etc.)")
def save_snapshots(self, filename):
"""保存快照到文件"""
try:
with open(filename, 'w') as f:
json.dump(self.snapshots, f, indent=2)
print(f"Snapshots saved to {filename}")
except Exception as e:
print(f"Error saving snapshots: {e}")
def load_snapshots(self, filename):
"""从文件加载快照"""
try:
with open(filename, 'r') as f:
self.snapshots = json.load(f)
print(f"Snapshots loaded from {filename}")
except Exception as e:
print(f"Error loading snapshots: {e}")
# 使用示例
if __name__ == "__main__":
detector = MemoryLeakDetector()
# 拍摄初始快照
detector.take_snapshot("initial")
print("\nPerform operations that might cause memory leaks...")
print("Press Enter when ready to take second snapshot")
input()
# 拍摄第二个快照
detector.take_snapshot("after_operations")
# 比较快照
detector.compare_snapshots()
# 生成报告
detector.generate_report()
# 保存快照
detector.save_snapshots("memory_snapshots.json")
5. 扩展部署和分发
5.1 扩展打包
传统扩展打包
@echo off
echo Packaging WinDbg Extension...
# 创建发布目录
mkdir release
mkdir release\x64
mkdir release\x86
mkdir release\docs
# 复制文件
copy x64\Release\myextension.dll release\x64\
copy x86\Release\myextension.dll release\x86\
copy README.md release\docs\
copy CHANGELOG.md release\docs\
# 创建安装脚本
echo Creating install script...
(
echo @echo off
echo echo Installing MyExtension...
echo.
echo # 检测WinDbg安装路径
echo for /f "tokens=2*" %%%%a in ^('reg query "HKLM\SOFTWARE\Microsoft\Windows Kits\Installed Roots" /v KitsRoot10 2^>nul'^) do set KITS_ROOT=%%%%b
echo.
echo if not defined KITS_ROOT ^(
echo echo Windows SDK not found!
echo pause
echo exit /b 1
echo ^\)
echo.
echo # 复制扩展文件
echo copy x64\myextension.dll "%%KITS_ROOT%%Debuggers\x64\"
echo copy x86\myextension.dll "%%KITS_ROOT%%Debuggers\x86\"
echo.
echo echo Installation complete!
echo pause
) > release\install.bat
echo Package created in release\ directory
pause
JavaScript扩展打包
// package.js - 扩展打包脚本
const fs = require('fs');
const path = require('path');
const archiver = require('archiver');
function packageExtension() {
const packageInfo = {
name: "MyJSExtension",
version: "1.0.0",
description: "Advanced debugging extension for WinDbg",
author: "Your Name",
files: [
"myextension.js",
"utils.js",
"README.md",
"LICENSE"
]
};
// 创建package.json
fs.writeFileSync('package.json', JSON.stringify(packageInfo, null, 2));
// 创建ZIP包
const output = fs.createWriteStream('myextension-1.0.0.zip');
const archive = archiver('zip', { zlib: { level: 9 } });
output.on('close', () => {
console.log(`Package created: ${archive.pointer()} bytes`);
});
archive.on('error', (err) => {
throw err;
});
archive.pipe(output);
// 添加文件
packageInfo.files.forEach(file => {
if (fs.existsSync(file)) {
archive.file(file, { name: file });
}
});
archive.finalize();
}
packageExtension();
5.2 扩展安装
自动安装脚本
# install_extension.ps1
param(
[Parameter(Mandatory=$true)]
[string]$ExtensionPath,
[Parameter(Mandatory=$false)]
[string]$Architecture = "x64"
)
Write-Host "Installing WinDbg Extension..." -ForegroundColor Green
# 查找WinDbg安装路径
$windbgPaths = @(
"${env:ProgramFiles(x86)}\Windows Kits\10\Debuggers\$Architecture",
"${env:ProgramFiles}\Windows Kits\10\Debuggers\$Architecture",
"${env:ProgramFiles(x86)}\Windows Kits\8.1\Debuggers\$Architecture",
"${env:ProgramFiles}\Windows Kits\8.1\Debuggers\$Architecture"
)
$windbgPath = $null
foreach ($path in $windbgPaths) {
if (Test-Path "$path\windbg.exe") {
$windbgPath = $path
break
}
}
if (-not $windbgPath) {
Write-Error "WinDbg not found. Please install Windows SDK."
exit 1
}
Write-Host "Found WinDbg at: $windbgPath" -ForegroundColor Yellow
# 复制扩展文件
try {
if (Test-Path $ExtensionPath) {
$fileName = Split-Path $ExtensionPath -Leaf
$destination = Join-Path $windbgPath $fileName
Copy-Item $ExtensionPath $destination -Force
Write-Host "Extension installed: $destination" -ForegroundColor Green
# 验证安装
if (Test-Path $destination) {
Write-Host "Installation verified successfully!" -ForegroundColor Green
} else {
Write-Error "Installation verification failed!"
}
} else {
Write-Error "Extension file not found: $ExtensionPath"
}
} catch {
Write-Error "Installation failed: $($_.Exception.Message)"
}
Write-Host "Installation complete!" -ForegroundColor Green
5.3 扩展管理
扩展管理器
# extension_manager.py
import os
import json
import shutil
import winreg
from pathlib import Path
class WinDbgExtensionManager:
def __init__(self):
self.windbg_paths = self._find_windbg_paths()
self.extensions_config = self._load_extensions_config()
def _find_windbg_paths(self):
"""查找WinDbg安装路径"""
paths = []
try:
# 从注册表查找
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,
r"SOFTWARE\Microsoft\Windows Kits\Installed Roots") as key:
kits_root, _ = winreg.QueryValueEx(key, "KitsRoot10")
for arch in ["x64", "x86"]:
debugger_path = os.path.join(kits_root, "Debuggers", arch)
if os.path.exists(os.path.join(debugger_path, "windbg.exe")):
paths.append({
'path': debugger_path,
'architecture': arch,
'version': self._get_windbg_version(debugger_path)
})
except Exception as e:
print(f"Error finding WinDbg paths: {e}")
return paths
def _get_windbg_version(self, path):
"""获取WinDbg版本"""
try:
windbg_exe = os.path.join(path, "windbg.exe")
if os.path.exists(windbg_exe):
# 简化版本检测
return "10.0"
except:
pass
return "Unknown"
def _load_extensions_config(self):
"""加载扩展配置"""
config_file = "extensions.json"
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading config: {e}")
return {'extensions': []}
def _save_extensions_config(self):
"""保存扩展配置"""
try:
with open("extensions.json", 'w') as f:
json.dump(self.extensions_config, f, indent=2)
except Exception as e:
print(f"Error saving config: {e}")
def install_extension(self, extension_path, name=None, description=None):
"""安装扩展"""
if not os.path.exists(extension_path):
print(f"Extension file not found: {extension_path}")
return False
extension_name = name or os.path.splitext(os.path.basename(extension_path))[0]
print(f"Installing extension: {extension_name}")
success_count = 0
for windbg_info in self.windbg_paths:
try:
dest_path = os.path.join(windbg_info['path'], os.path.basename(extension_path))
shutil.copy2(extension_path, dest_path)
print(f" Installed to {windbg_info['architecture']}: {dest_path}")
success_count += 1
except Exception as e:
print(f" Failed to install to {windbg_info['architecture']}: {e}")
if success_count > 0:
# 更新配置
extension_info = {
'name': extension_name,
'file': os.path.basename(extension_path),
'description': description or f"Extension {extension_name}",
'installed_paths': [info['path'] for info in self.windbg_paths],
'type': self._detect_extension_type(extension_path)
}
self.extensions_config['extensions'].append(extension_info)
self._save_extensions_config()
print(f"Extension {extension_name} installed successfully!")
return True
return False
def _detect_extension_type(self, extension_path):
"""检测扩展类型"""
ext = os.path.splitext(extension_path)[1].lower()
if ext == '.dll':
return 'native'
elif ext == '.js':
return 'javascript'
elif ext == '.py':
return 'python'
else:
return 'unknown'
def uninstall_extension(self, extension_name):
"""卸载扩展"""
extension_info = None
for ext in self.extensions_config['extensions']:
if ext['name'] == extension_name:
extension_info = ext
break
if not extension_info:
print(f"Extension {extension_name} not found")
return False
print(f"Uninstalling extension: {extension_name}")
success_count = 0
for path in extension_info['installed_paths']:
try:
file_path = os.path.join(path, extension_info['file'])
if os.path.exists(file_path):
os.remove(file_path)
print(f" Removed from: {path}")
success_count += 1
except Exception as e:
print(f" Failed to remove from {path}: {e}")
if success_count > 0:
# 从配置中移除
self.extensions_config['extensions'].remove(extension_info)
self._save_extensions_config()
print(f"Extension {extension_name} uninstalled successfully!")
return True
return False
def list_extensions(self):
"""列出已安装的扩展"""
print("Installed Extensions:")
if not self.extensions_config['extensions']:
print(" No extensions installed")
return
for ext in self.extensions_config['extensions']:
print(f"\n Name: {ext['name']}")
print(f" Type: {ext['type']}")
print(f" File: {ext['file']}")
print(f" Description: {ext['description']}")
print(f" Installed paths: {len(ext['installed_paths'])}")
def verify_installations(self):
"""验证扩展安装"""
print("Verifying extension installations...")
for ext in self.extensions_config['extensions']:
print(f"\nVerifying {ext['name']}:")
for path in ext['installed_paths']:
file_path = os.path.join(path, ext['file'])
if os.path.exists(file_path):
print(f" ✓ {path}")
else:
print(f" ✗ {path} (missing)")
# 使用示例
if __name__ == "__main__":
manager = WinDbgExtensionManager()
print("WinDbg Extension Manager")
print("========================")
# 显示WinDbg安装信息
print(f"\nFound {len(manager.windbg_paths)} WinDbg installation(s):")
for info in manager.windbg_paths:
print(f" {info['architecture']}: {info['path']} (v{info['version']})")
# 列出已安装的扩展
manager.list_extensions()
# 验证安装
manager.verify_installations()
6. 扩展调试和测试
6.1 扩展调试技巧
调试输出
// 调试宏定义
#ifdef _DEBUG
#define DBG_PRINT(fmt, ...) dprintf("[DEBUG] " fmt, __VA_ARGS__)
#define DBG_ENTER() dprintf("[DEBUG] Entering %s\n", __FUNCTION__)
#define DBG_EXIT() dprintf("[DEBUG] Exiting %s\n", __FUNCTION__)
#else
#define DBG_PRINT(fmt, ...)
#define DBG_ENTER()
#define DBG_EXIT()
#endif
// 使用示例
DECLARE_API(debug_example)
{
DBG_ENTER();
DBG_PRINT("Processing arguments: %s\n", args ? args : "(none)");
// 扩展逻辑
if (args && *args)
{
ULONG64 address = GetExpression(args);
DBG_PRINT("Parsed address: 0x%I64x\n", address);
if (address != 0)
{
// 处理地址
dprintf("Processing address 0x%I64x\n", address);
}
else
{
DBG_PRINT("Invalid address expression: %s\n", args);
dprintf("Error: Invalid address\n");
}
}
DBG_EXIT();
}
错误处理
// 错误处理宏
#define SAFE_CALL(func, ...) \
do { \
HRESULT hr = func(__VA_ARGS__); \
if (FAILED(hr)) { \
dprintf("Error in %s: 0x%08x\n", #func, hr); \
return; \
} \
} while(0)
#define SAFE_READ_MEMORY(addr, buffer, size) \
do { \
ULONG bytesRead; \
if (!ReadMemory(addr, buffer, size, &bytesRead) || bytesRead != size) { \
dprintf("Failed to read memory at 0x%I64x\n", addr); \
return; \
} \
} while(0)
// 使用示例
DECLARE_API(safe_example)
{
if (!args || !*args)
{
dprintf("Usage: !safe_example <address>\n");
return;
}
ULONG64 address = GetExpression(args);
if (address == 0)
{
dprintf("Error: Invalid address expression\n");
return;
}
// 安全读取内存
UCHAR buffer[16];
SAFE_READ_MEMORY(address, buffer, sizeof(buffer));
// 处理数据
dprintf("Data at 0x%I64x: ", address);
for (int i = 0; i < sizeof(buffer); i++)
{
dprintf("%02x ", buffer[i]);
}
dprintf("\n");
}
6.2 单元测试
扩展测试框架
// test_framework.h
#pragma once
#include <vector>
#include <string>
#include <functional>
class ExtensionTestFramework
{
public:
struct TestCase
{
std::string name;
std::function<bool()> test_func;
std::string description;
};
static ExtensionTestFramework& Instance()
{
static ExtensionTestFramework instance;
return instance;
}
void RegisterTest(const std::string& name, std::function<bool()> test_func, const std::string& description = "")
{
tests_.push_back({name, test_func, description});
}
void RunAllTests()
{
dprintf("Running %d test(s)...\n", static_cast<int>(tests_.size()));
int passed = 0;
int failed = 0;
for (const auto& test : tests_)
{
dprintf("\nRunning test: %s\n", test.name.c_str());
if (!test.description.empty())
{
dprintf("Description: %s\n", test.description.c_str());
}
try
{
if (test.test_func())
{
dprintf("PASSED: %s\n", test.name.c_str());
passed++;
}
else
{
dprintf("FAILED: %s\n", test.name.c_str());
failed++;
}
}
catch (...)
{
dprintf("EXCEPTION: %s\n", test.name.c_str());
failed++;
}
}
dprintf("\nTest Results: %d passed, %d failed\n", passed, failed);
}
private:
std::vector<TestCase> tests_;
};
// 测试宏
#define REGISTER_TEST(name, func, desc) \
ExtensionTestFramework::Instance().RegisterTest(name, func, desc)
#define ASSERT_TRUE(condition) \
do { \
if (!(condition)) { \
dprintf("Assertion failed: %s\n", #condition); \
return false; \
} \
} while(0)
#define ASSERT_EQ(expected, actual) \
do { \
if ((expected) != (actual)) { \
dprintf("Assertion failed: expected %s, got %s\n", #expected, #actual); \
return false; \
} \
} while(0)
测试用例实现
// tests.cpp
#include "test_framework.h"
// 测试内存读取功能
bool TestMemoryRead()
{
// 获取一个已知的有效地址
ULONG64 address = GetExpression("@eip");
ASSERT_TRUE(address != 0);
// 尝试读取内存
UCHAR buffer[4];
ULONG bytesRead;
BOOL result = ReadMemory(address, buffer, sizeof(buffer), &bytesRead);
ASSERT_TRUE(result);
ASSERT_EQ(sizeof(buffer), bytesRead);
return true;
}
// 测试符号解析功能
bool TestSymbolResolution()
{
// 获取当前指令地址
ULONG64 address = GetExpression("@eip");
ASSERT_TRUE(address != 0);
// 尝试解析符号
CHAR symbolName[256];
ULONG64 displacement;
if (ResolveSymbol(address, symbolName, sizeof(symbolName), &displacement))
{
dprintf("Symbol: %s+0x%I64x\n", symbolName, displacement);
return true;
}
// 符号解析失败不一定是错误(可能没有符号)
dprintf("No symbol found for address 0x%I64x\n", address);
return true;
}
// 测试表达式求值
bool TestExpressionEvaluation()
{
// 测试简单表达式
ULONG64 result1 = GetExpression("1+1");
ASSERT_EQ(2, result1);
// 测试十六进制
ULONG64 result2 = GetExpression("0x10");
ASSERT_EQ(16, result2);
// 测试寄存器
ULONG64 eip = GetExpression("@eip");
ASSERT_TRUE(eip != 0);
return true;
}
// 注册所有测试
void RegisterAllTests()
{
REGISTER_TEST("MemoryRead", TestMemoryRead, "Test memory reading functionality");
REGISTER_TEST("SymbolResolution", TestSymbolResolution, "Test symbol resolution");
REGISTER_TEST("ExpressionEvaluation", TestExpressionEvaluation, "Test expression evaluation");
}
// 运行测试的扩展命令
DECLARE_API(runtests)
{
dprintf("=== Extension Unit Tests ===\n");
RegisterAllTests();
ExtensionTestFramework::Instance().RunAllTests();
}
6.3 性能测试
性能测量工具
// performance.h
#pragma once
#include <windows.h>
class PerformanceTimer
{
public:
PerformanceTimer()
{
QueryPerformanceFrequency(&frequency_);
Reset();
}
void Reset()
{
QueryPerformanceCounter(&start_time_);
}
double ElapsedMilliseconds()
{
LARGE_INTEGER end_time;
QueryPerformanceCounter(&end_time);
double elapsed = static_cast<double>(end_time.QuadPart - start_time_.QuadPart);
return (elapsed * 1000.0) / frequency_.QuadPart;
}
double ElapsedMicroseconds()
{
return ElapsedMilliseconds() * 1000.0;
}
private:
LARGE_INTEGER frequency_;
LARGE_INTEGER start_time_;
};
// 性能测试宏
#define PERF_TEST_BEGIN(name) \
do { \
dprintf("Performance test: %s\n", name); \
PerformanceTimer timer; \
timer.Reset();
#define PERF_TEST_END() \
double elapsed = timer.ElapsedMilliseconds(); \
dprintf("Elapsed time: %.3f ms\n", elapsed); \
} while(0)
性能测试示例
// 测试内存读取性能
DECLARE_API(perftest)
{
dprintf("=== Performance Tests ===\n");
// 测试内存读取性能
PERF_TEST_BEGIN("Memory Read (1000 iterations)");
{
ULONG64 address = GetExpression("@eip");
UCHAR buffer[16];
for (int i = 0; i < 1000; i++)
{
ReadMemory(address, buffer, sizeof(buffer), NULL);
}
}
PERF_TEST_END();
// 测试符号解析性能
PERF_TEST_BEGIN("Symbol Resolution (100 iterations)");
{
ULONG64 address = GetExpression("@eip");
CHAR symbolName[256];
ULONG64 displacement;
for (int i = 0; i < 100; i++)
{
ResolveSymbol(address, symbolName, sizeof(symbolName), &displacement);
}
}
PERF_TEST_END();
// 测试表达式求值性能
PERF_TEST_BEGIN("Expression Evaluation (1000 iterations)");
{
for (int i = 0; i < 1000; i++)
{
GetExpression("@eip");
}
}
PERF_TEST_END();
}
7. 最佳实践和注意事项
7.1 开发最佳实践
代码规范
// 1. 使用一致的命名约定
// 函数名:PascalCase
// 变量名:camelCase
// 常量:UPPER_CASE
// 扩展命令:lowercase
// 2. 添加详细注释
/**
* @brief 分析进程堆信息
* @param args 命令行参数,可选的堆地址
* @return 无返回值
* @note 如果未提供参数,分析所有堆
*/
DECLARE_API(analyzeheap)
{
// 实现代码
}
// 3. 使用错误处理
// 总是检查API调用的返回值
// 提供有意义的错误消息
// 优雅地处理异常情况
// 4. 内存管理
// 及时释放分配的内存
// 避免内存泄漏
// 使用RAII模式
安全考虑
// 1. 输入验证
DECLARE_API(secure_example)
{
// 检查参数
if (!args || !*args)
{
dprintf("Usage: !secure_example <address>\n");
return;
}
// 验证地址范围
ULONG64 address = GetExpression(args);
if (address == 0 || address == BADADDR)
{
dprintf("Error: Invalid address\n");
return;
}
// 检查内存可访问性
if (!IsAddressValid(address))
{
dprintf("Error: Address not accessible\n");
return;
}
// 安全处理
}
// 2. 缓冲区保护
void SafeStringCopy(char* dest, size_t destSize, const char* src)
{
if (dest && src && destSize > 0)
{
strncpy_s(dest, destSize, src, _TRUNCATE);
}
}
// 3. 异常处理
DECLARE_API(exception_safe)
{
__try
{
// 可能抛出异常的代码
ULONG64 address = GetExpression(args);
UCHAR buffer[16];
ReadMemory(address, buffer, sizeof(buffer), NULL);
}
__except(EXCEPTION_EXECUTE_HANDLER)
{
dprintf("Exception occurred during operation\n");
}
}
7.2 性能优化
缓存策略
// 符号缓存
class SymbolCache
{
private:
std::map<ULONG64, std::string> cache_;
static const size_t MAX_CACHE_SIZE = 1000;
public:
bool GetSymbol(ULONG64 address, std::string& symbol)
{
auto it = cache_.find(address);
if (it != cache_.end())
{
symbol = it->second;
return true;
}
// 查找符号
CHAR symbolName[256];
ULONG64 displacement;
if (ResolveSymbol(address, symbolName, sizeof(symbolName), &displacement))
{
symbol = std::string(symbolName) + "+0x" + std::to_string(displacement);
// 添加到缓存
if (cache_.size() < MAX_CACHE_SIZE)
{
cache_[address] = symbol;
}
return true;
}
return false;
}
void ClearCache()
{
cache_.clear();
}
};
// 全局符号缓存
static SymbolCache g_symbolCache;
批量操作优化
// 批量内存读取
class BatchMemoryReader
{
public:
struct ReadRequest
{
ULONG64 address;
PVOID buffer;
ULONG size;
PULONG bytesRead;
};
bool BatchRead(const std::vector<ReadRequest>& requests)
{
bool allSuccess = true;
for (const auto& req : requests)
{
if (!ReadMemory(req.address, req.buffer, req.size, req.bytesRead))
{
allSuccess = false;
if (req.bytesRead)
{
*req.bytesRead = 0;
}
}
}
return allSuccess;
}
};
7.3 兼容性考虑
版本兼容性
// 检查WinDbg版本
bool CheckWinDbgVersion(USHORT requiredMajor, USHORT requiredMinor)
{
LPEXT_API_VERSION apiVersion = ExtensionApiVersion();
if (apiVersion->MajorVersion > requiredMajor ||
(apiVersion->MajorVersion == requiredMajor && apiVersion->MinorVersion >= requiredMinor))
{
return true;
}
dprintf("Warning: This extension requires WinDbg version %d.%d or higher\n",
requiredMajor, requiredMinor);
return false;
}
// 条件功能
DECLARE_API(version_aware)
{
if (CheckWinDbgVersion(10, 0))
{
// 使用新功能
dprintf("Using advanced features\n");
}
else
{
// 使用兼容功能
dprintf("Using compatibility mode\n");
}
}
架构兼容性
// 检查目标架构
bool IsTarget64Bit()
{
return (ExtensionApis.lpGetExpression("@$ptrsize") == 8);
}
DECLARE_API(arch_aware)
{
if (IsTarget64Bit())
{
dprintf("Target is 64-bit\n");
// 64位特定处理
}
else
{
dprintf("Target is 32-bit\n");
// 32位特定处理
}
}
8. 总结
WinDbg扩展开发是一个强大的调试增强技术,通过本章的学习,您应该掌握:
8.1 核心技能
- 传统C/C++扩展开发
- JavaScript扩展开发
- Python扩展开发
- 扩展部署和管理
8.2 最佳实践
- 代码规范和安全考虑
- 性能优化技巧
- 兼容性处理
- 测试和调试方法
8.3 进阶方向
- 复杂数据结构可视化
- 自动化分析工具
- 企业级扩展管理
- 与其他调试工具集成
通过扩展开发,您可以大大提高调试效率,创建专门针对特定应用或问题域的调试工具,使WinDbg成为更加强大和个性化的调试平台。