查找磁盘中具备RWX区段的DLL

·

安装依赖库

pip install pefile colorama tqdm

完整代码

import os
import sys
import argparse
import pefile
from colorama import init, Fore, Style
from concurrent.futures import ThreadPoolExecutor, as_completed
# 尝试导入 tqdm 用于显示进度条
try:
    from tqdm import tqdm
except ImportError:
    print(f"{Fore.RED}[!] 缺少 tqdm 库。请运行 'pip install tqdm' 安装。")
    sys.exit(1)

# Initialize colorama
init(autoreset=True)

# PE Section Characteristics Flags
IMAGE_SCN_MEM_EXECUTE = 0x20000000
IMAGE_SCN_MEM_READ = 0x40000000
IMAGE_SCN_MEM_WRITE = 0x80000000

def get_arch(pe):
    """
    Determine the architecture of the PE file.
    """
    if pe.FILE_HEADER.Machine == 0x8664:
        return "x64"
    elif pe.FILE_HEADER.Machine == 0x014c:
        return "x86"
    return "Unknown"

def is_signed(pe):
    """
    Check if the DLL has a digital signature by inspecting the Security Directory.
    """
    try:
        # IMAGE_DIRECTORY_ENTRY_SECURITY is index 4
        security_dir = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']]
        
        # If VirtualAddress is non-zero and Size is non-zero, a signature exists
        if security_dir.VirtualAddress != 0 and security_dir.Size > 0:
            return True
    except IndexError:
        pass
    except AttributeError:
        pass
    
    return False

def analyze_dll(filepath, target_arch="x64"):
    """
    Analyze a single DLL file for RWX sections and signatures.
    Returns a dictionary if RWX found and arch matches, else None.
    """
    try:
        # Load the PE file
        # fast_load=True parses headers but skips imports/exports
        pe = pefile.PE(filepath, fast_load=True)
    except Exception:
        # 静默失败:无法打开的文件、权限错误或非PE文件都不打印错误
        return None

    # 1. Check Architecture first
    current_arch = get_arch(pe)
    
    if target_arch != "all" and current_arch != target_arch:
        pe.close()
        return None

    rwx_sections = []
    
    # 2. Iterate through sections to find RWX
    for section in pe.sections:
        characteristics = section.Characteristics
        
        # Check for R, W, and X flags
        is_r = characteristics & IMAGE_SCN_MEM_READ
        is_w = characteristics & IMAGE_SCN_MEM_WRITE
        is_x = characteristics & IMAGE_SCN_MEM_EXECUTE
        
        if is_r and is_w and is_x:
            # Decode section name (bytes to string) and strip null bytes
            sec_name = section.Name.decode('utf-8', errors='ignore').strip('\x00')
            sec_size = section.Misc_VirtualSize 
            rwx_sections.append({
                "name": sec_name,
                "size": sec_size
            })

    if rwx_sections:
        result = {
            "path": filepath,
            "arch": current_arch,
            "signed": is_signed(pe),
            "rwx_sections": rwx_sections
        }
        pe.close()
        return result
    
    pe.close()
    return None

def format_size(size_bytes):
    """
    Format bytes into KB or MB for display.
    """
    if size_bytes >= 1024 * 1024:
        return f"{size_bytes / (1024 * 1024):.2f} MB"
    elif size_bytes >= 1024:
        return f"{size_bytes / 1024:.2f} KB"
    else:
        return f"{size_bytes} Bytes"

def print_result(result):
    """
    Format and print the result using tqdm.write to avoid breaking the progress bar.
    """
    # Output formatting
    arch_color = Fore.MAGENTA if result['arch'] == "x64" else Fore.YELLOW
    sign_color = Fore.BLUE if result['signed'] else Fore.RED
    sign_text = "SIGNED" if result['signed'] else "UNSIGNED"
    
    output = []
    output.append(f"{Fore.WHITE}DLL: {result['path']}")
    output.append(f"  Arch: {arch_color}{result['arch']}{Style.RESET_ALL} | Signature: {sign_color}{sign_text}{Style.RESET_ALL}")
    
    for sec in result['rwx_sections']:
        size_str = format_size(sec['size'])
        
        # Color code size based on "juiciness"
        size_color = Fore.GREEN
        if sec['size'] < 1024 * 100: # < 100KB
            size_color = Fore.WHITE
        elif sec['size'] < 1024 * 1024: # < 1MB
            size_color = Fore.CYAN
        
        output.append(f"  -> RWX Section: {Fore.RED}{sec['name']}{Style.RESET_ALL} | Size: {size_color}{size_str}")
    
    output.append("-" * 50)
    
    # Use tqdm.write to print safely above the progress bar
    tqdm.write("\n".join(output))

def scan_directory(target_dir, max_workers=20, target_arch="x64"):
    """
    Recursively scan the directory for DLLs using multithreading.
    """
    print(f"{Fore.CYAN}[*] 正在索引 DLL 文件: {target_dir} ...")
    
    dll_files = []
    # 第一步:快速遍历收集所有 DLL 文件路径
    for root, dirs, files in os.walk(target_dir):
        for file in files:
            if file.lower().endswith('.dll'):
                dll_files.append(os.path.join(root, file))
    
    total_files = len(dll_files)
    print(f"{Fore.CYAN}[*] 找到 {total_files} 个 DLL 文件。")
    print(f"{Fore.CYAN}[*] 目标架构: {Fore.YELLOW}{target_arch}")
    print(f"{Fore.CYAN}[*] 正在使用 {max_workers} 个线程进行 RWX 扫描...\n")
    
    found_count = 0
    
    # 第二步:多线程处理
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 将 target_arch 传递给 analyze_dll
        future_to_file = {executor.submit(analyze_dll, f, target_arch): f for f in dll_files}
        
        # 使用 tqdm 显示进度条
        with tqdm(total=total_files, unit="file", desc="扫描进度", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") as pbar:
            for future in as_completed(future_to_file):
                result = future.result()
                pbar.update(1) # 更新进度条
                
                if result:
                    found_count += 1
                    print_result(result)

    if found_count == 0:
        print(f"\n{Fore.YELLOW}[-] 未发现符合条件(架构: {target_arch}, 包含 RWX)的 DLL。")
    else:
        print(f"\n{Fore.GREEN}[+] 扫描完成。共发现 {found_count} 个潜在目标。")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="递归扫描具备 RWX 区段的 DLL 文件 (多线程版)")
    parser.add_argument("path", help="要扫描的目录路径")
    parser.add_argument("-t", "--threads", type=int, default=20, help="扫描线程数 (默认: 20)")
    parser.add_argument("-a", "--arch", choices=["x86", "x64", "all"], default="x64", help="指定目标架构: x86, x64, 或 all (默认: x64)")
    
    args = parser.parse_args()
    
    # 修复 Windows 下路径末尾包含反斜杠导致引号被转义的问题
    # 例如命令行输入 "C:\Dir\" 可能被解析为 C:\Dir"
    target_path = args.path.strip('"').strip("'")
    
    if not os.path.isdir(target_path):
        print(f"{Fore.RED}[!] 无效的目录路径: {target_path}")
        sys.exit(1)
        
    try:
        scan_directory(target_path, max_workers=args.threads, target_arch=args.arch)
    except KeyboardInterrupt:
        print(f"\n{Fore.YELLOW}[!] 用户中断扫描。")
        sys.exit(0)

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注