找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3207

积分

0

好友

414

主题
发表于 2026-2-11 15:53:09 | 查看: 35| 回复: 0

一、PyTorch概述与设计哲学

1. PyTorch生态系统全览

PyTorch 不仅是一个张量计算库,更是一个覆盖研究、训练、部署、优化全链路的深度学习生态。其模块化设计让开发者能按需选用组件,既可快速验证想法,也能构建工业级服务。

以下代码可视化了 PyTorch 核心模块及其依赖关系:

import matplotlib.pyplot as plt

def visualize_pytorch_ecosystem():
    """可视化PyTorch生态系统"""

    components = {
        'PyTorch Core': ['张量计算', '自动微分', 'GPU加速'],
        'torch.nn': ['神经网络模块', '损失函数', '优化器'],
        'torchvision': ['计算机视觉', '数据集', '预训练模型'],
        'torchaudio': ['音频处理', '音频数据集', '音频模型'],
        'torchtext': ['文本处理', 'NLP数据集', '文本模型'],
        'TorchServe': ['模型部署', '推理服务', '模型管理'],
        'TorchScript': ['模型序列化', '跨语言部署', '性能优化'],
        'PyTorch Lightning': ['训练抽象', '实验管理', '可复现性'],
        'Hugging Face Transformers': ['预训练模型', 'NLP工具', '模型共享'],
        'Detectron2': ['目标检测', '实例分割', '姿态估计']
    }

    # 创建生态系统图
    fig, ax = plt.subplots(figsize=(14, 8))
    ax.axis('off')

    # 设置位置
    positions = {
        'PyTorch Core': (0.5, 0.8),
        'torch.nn': (0.3, 0.7),
        'torchvision': (0.2, 0.6),
        'torchaudio': (0.4, 0.6),
        'torchtext': (0.6, 0.6),
        'TorchServe': (0.8, 0.7),
        'TorchScript': (0.7, 0.5),
        'PyTorch Lightning': (0.3, 0.5),
        'Hugging Face Transformers': (0.5, 0.4),
        'Detectron2': (0.7, 0.3)
    }

    # 绘制组件
    for component, (x, y) in positions.items():
        # 绘制框
        box = plt.Rectangle((x-0.08, y-0.03), 0.16, 0.06,
                            facecolor='#FF6B6B', edgecolor='#C44D58', alpha=0.8)
        ax.add_patch(box)

        # 添加文本
        ax.text(x, y, component, ha='center', va='center',
                fontsize=9, fontweight='bold', color='white')

        # 添加功能描述
        features = components[component]
        feature_text = '\n'.join(features)
        ax.text(x, y-0.02, feature_text, ha='center', va='top',
                fontsize=7, fontstyle='italic')

    # 添加连接线
    connections = [
        ('PyTorch Core', 'torch.nn'),
        ('PyTorch Core', 'torchvision'),
        ('PyTorch Core', 'torchaudio'),
        ('PyTorch Core', 'torchtext'),
        ('PyTorch Core', 'TorchServe'),
        ('PyTorch Core', 'TorchScript'),
        ('torch.nn', 'PyTorch Lightning'),
        ('torchvision', 'Detectron2'),
        ('torchtext', 'Hugging Face Transformers')
    ]

    for start, end in connections:
        x1, y1 = positions[start]
        x2, y2 = positions[end]
        ax.annotate('', xy=(x2, y2-0.03), xytext=(x1, y1+0.03),
                    arrowprops=dict(arrowstyle='->', color='gray', alpha=0.6, lw=1.5))

    ax.set_xlim(0, 1)
    ax.set_ylim(0, 1)
    ax.set_title('PyTorch生态系统架构', fontsize=16, fontweight='bold', color='#333333')
    plt.show()

visualize_pytorch_ecosystem()

该图清晰展示了 PyTorch Core 作为底层引擎,向上支撑各类领域专用库(如 torchvisiontorchaudio),并向外延伸出生产部署(TorchServe)、跨平台推理(TorchScript)、工程化封装(Lightning)等关键能力层。

2. PyTorch vs TensorFlow 对比

选择框架不应凭直觉,而应基于项目阶段、团队能力与交付目标。下表从 8 个维度展开对比:

def compare_pytorch_tensorflow():
    """PyTorch与TensorFlow详细对比"""

    comparison_data = {
        '特性': ['动态计算图', '静态计算图', 'API风格', '调试体验', '研究友好度', '生产部署', '社区生态', '学习曲线'],
        'PyTorch': [
            'Eager模式为主\n动态构建计算图\n调试方便',
            '通过TorchScript\n支持静态图',
            'Pythonic\n面向对象\n直观易用',
            '优秀的Python\n调试器支持\n实时错误检查',
            '非常高\n适合原型开发\n和学术研究',
            '良好\nTorchServe支持\n但相对较新',
            '快速增长\n研究社区强大\n开源活跃',
            '平缓\n符合Python习惯'
        ],
        'TensorFlow': [
            'Eager模式支持\n但传统是静态图',
            '静态图为主\n优化性能好',
            '函数式\n有时冗长\n有学习成本',
            '图模式调试\n复杂',
            '高\n但传统上更\n适合生产',
            '非常成熟\nTF Serving完善\n工具链完整',
            '庞大成熟\n工业界广泛使用\n企业支持强',
            '较陡峭\n概念较多'
        ]
    }

    import pandas as pd

    df = pd.DataFrame(comparison_data)
    print("PyTorch vs TensorFlow 详细对比")
    print("=" * 120)

    # 格式化输出
    for idx, row in df.iterrows():
        print(f"\n{row['特性']}:")
        print(f"  PyTorch: {row['PyTorch']}")
        print(f"  TensorFlow: {row['TensorFlow']}")

    print("\n" + "=" * 120)
    print("\n选择建议:")
    print("1. 学术研究/快速原型: 选择 PyTorch")
    print("2. 生产部署/企业应用: 考虑 TensorFlow")
    print("3. 入门学习: PyTorch 更容易上手")
    print("4. 多框架掌握: 学习两者,了解各自优势")
    print("5. 最新趋势: 两者都在互相借鉴,差距在缩小")

    return df

compare_pytorch_tensorflow()

关键结论:  

  • 若你正参与高校课题、AI竞赛或需要频繁修改网络结构(如探索新型注意力机制),PyTorch 是更自然的选择;  
  • 若你所在团队已建立成熟的 TF Serving 运维体系,且模型迭代频率低、稳定性要求极高,TensorFlow 仍具优势;  
  • 值得注意的是,PyTorch 2.0+ 的 torch.compile()TorchDynamo 已大幅补齐图优化短板,二者技术边界正持续模糊。

二、PyTorch安装与环境配置

1. 安装指南

安装方式直接影响后续开发效率与 GPU 利用率。推荐优先使用 conda 管理环境,避免 CUDA 版本冲突。

def setup_pytorch_environment():
    """PyTorch环境配置指南"""

    setups = {
        '基础安装 (CPU版本)': {
            '命令': 'pip install torch torchvision torchaudio',
            '说明': '安装CPU版本,适合学习和开发',
            '验证代码': '''import torch
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA是否可用: {torch.cuda.is_available()}")
'''
        },
        'GPU支持 (CUDA 11.8)': {
            '命令': 'pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118',
            '说明': '安装CUDA 11.8支持的GPU版本',
            '前提条件': [
                'NVIDIA GPU (Compute Capability 3.5+)',
                'CUDA Toolkit 11.8',
                'cuDNN 8.6+'
            ]
        },
        'conda安装': {
            '命令': 'conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia',
            '说明': '使用conda安装(推荐)',
            '优势': '自动处理依赖,环境管理方便'
        },
        'Docker安装': {
            '命令': 'docker pull pytorch/pytorch:latest',
            '说明': '使用Docker容器',
            '运行命令': 'docker run -it --gpus all pytorch/pytorch:latest python'
        },
        '特定版本': {
            '命令': 'pip install torch==2.1.0 torchvision==0.16.0 torchaudio==2.1.0',
            '说明': '安装特定版本(生产环境推荐)'
        },
        '开发版本': {
            '命令': 'pip install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu118',
            '说明': '安装夜间构建版本(最新功能)'
        }
    }

    print("PyTorch安装配置指南")
    print("=" * 80)

    for setup_type, setup_info in setups.items():
        print(f"\n{setup_type}:")
        print(f"  命令: {setup_info['命令']}")
        print(f"  说明: {setup_info['说明']}")

        if '前提条件' in setup_info:
            print("  前提条件:")
            for req in setup_info['前提条件']:
                print(f"    • {req}")

        if '优势' in setup_info:
            print(f"  优势: {setup_info['优势']}")

        if '验证代码' in setup_info:
            print(f"  验证代码: {setup_info['验证代码']}")

    print("\n" + "=" * 80)
    print("\n验证安装(复制运行以下代码):")
    print("""import torch
import torchvision

# 打印版本信息
print(f"PyTorch版本: {torch.__version__}")
print(f"torchvision版本: {torchvision.__version__}")

# 检查CUDA
if torch.cuda.is_available():
    print(f"CUDA可用")
    print(f"CUDA版本: {torch.version.cuda}")
    print(f"GPU数量: {torch.cuda.device_count()}")
    print(f"当前GPU: {torch.cuda.current_device()}")
    print(f"GPU名称: {torch.cuda.get_device_name(0)}")
else:
    print("CUDA不可用,使用CPU")
""")

setup_pytorch_environment()

📌 实操建议

  • 生产环境务必指定版本号(如 torch==2.1.0),避免因自动升级引发兼容性问题;
  • 使用 Docker 时,推荐拉取 pytorch/pytorch:2.1.0-cuda11.8-cudnn8-runtime 镜像,而非 latest
  • 若需多版本共存,用 conda env create -f environment.yml 管理隔离环境。

2. GPU配置与优化

正确配置 GPU 是释放算力的前提。以下脚本涵盖设备检测、内存管理、混合精度与性能调优:

def configure_gpu_for_pytorch():
    """配置PyTorch GPU使用"""

    config_code = '''import torch
import numpy as np
import time

def configure_gpu_settings():
    """配置GPU设置"""

    print("GPU配置与优化")
    print("=" * 60)

    # 1. 检查可用GPU
    device_count = torch.cuda.device_count()
    print(f"可用GPU数量: {device_count}")

    if device_count == 0:
        print("警告: 未找到GPU,使用CPU运行")
        device = torch.device('cpu')
    else:
        device = torch.device('cuda:0')

        # 2. 显示GPU信息
        for i in range(device_count):
            gpu_name = torch.cuda.get_device_name(i)
            gpu_memory = torch.cuda.get_device_properties(i).total_memory / 1e9
            print(f"GPU {i}: {gpu_name}, 显存: {gpu_memory:.1f} GB")

        # 3. 设置当前设备
        torch.cuda.set_device(0)
        print(f"当前GPU: {torch.cuda.current_device()}")

        # 4. 清空GPU缓存
        torch.cuda.empty_cache()

        # 5. 设置cuDNN基准
        torch.backends.cudnn.benchmark = True
        torch.backends.cudnn.enabled = True

        # 6. 设置随机种子(确保可复现性)
        torch.manual_seed(42)
        torch.cuda.manual_seed(42)
        torch.cuda.manual_seed_all(42)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

    # 7. 自动混合精度训练
    try:
        from torch.cuda.amp import autocast, GradScaler
        scaler = GradScaler()
        print("自动混合精度训练已启用")
    except:
        scaler = None
        print("自动混合精度训练不可用")

    return device, scaler

def check_gpu_performance(device):
    """检查GPU性能"""

    print("\\nGPU性能测试:")

    # 创建测试张量
    size = 5000
    a = torch.randn(size, size, device=device)
    b = torch.randn(size, size, device=device)

    # 预热
    _ = torch.mm(a, b)
    torch.cuda.synchronize()

    # 测量矩阵乘法时间
    times = []
    for _ in range(10):
        start = time.time()
        c = torch.mm(a, b)
        torch.cuda.synchronize()  # 确保计算完成
        times.append(time.time() - start)

    avg_time = np.mean(times) * 1000  # 转换为毫秒
    print(f"矩阵乘法 ({size}x{size}) 平均耗时: {avg_time:.2f} ms")
    print(f"结果形状: {c.shape}")

    # 内存使用情况
    if device.type == 'cuda':
        memory_allocated = torch.cuda.memory_allocated() / 1e6
        memory_reserved = torch.cuda.memory_reserved() / 1e6
        print(f"已分配显存: {memory_allocated:.1f} MB")
        print(f"已保留显存: {memory_reserved:.1f} MB")

    return avg_time

def memory_management_demo():
    """内存管理演示"""

    print("\\n内存管理演示:")

    # 1. 监控内存使用
    if torch.cuda.is_available():
        # 清空缓存
        torch.cuda.empty_cache()

        # 分配大张量
        print("分配大张量...")
        big_tensor = torch.randn(10000, 10000, device='cuda')
        print(f"分配后显存: {torch.cuda.memory_allocated() / 1e6:.1f} MB")

        # 删除张量
        del big_tensor
        torch.cuda.empty_cache()
        print(f"删除后显存: {torch.cuda.memory_allocated() / 1e6:.1f} MB")

        # 2. 使用with torch.no_grad()减少内存
        print("\\n使用torch.no_grad()减少内存:")

        with torch.no_grad():
            x = torch.randn(1000, 1000, device='cuda')
            y = torch.randn(1000, 1000, device='cuda')
            z = x @ y  # 不会保存计算图

        print(f"no_grad模式下显存: {torch.cuda.memory_allocated() / 1e6:.1f} MB")

# 运行配置
device, scaler = configure_gpu_settings()
check_gpu_performance(device)
memory_management_demo()
'''

    print(config_code)

    print("\n常见GPU问题解决方案:")
    print("1. CUDA版本不匹配: 使用对应CUDA版本的PyTorch")
    print("2. 显存不足: 减小batch_size,使用梯度累积")
    print("3. 多GPU训练: 使用torch.nn.DataParallel或DistributedDataParallel")
    print("4. 性能优化: 启用cudnn.benchmark,使用混合精度训练")
    print("5. 内存泄漏: 及时del张量,使用torch.cuda.empty_cache()")

configure_gpu_for_pytorch()

💡 高频技巧

  • torch.cuda.empty_cache() 不会释放已分配但未使用的显存,仅清空缓存池;
  • torch.backends.cudnn.benchmark = True 在首次运行时自动寻找最优卷积算法,适合输入尺寸固定的场景;
  • 混合精度(AMP)可将训练速度提升 1.5–3 倍,同时降低显存占用约 50%,强烈推荐开启。

三、PyTorch核心概念

1. 张量(Tensor)基础

张量是 PyTorch 的数据基石。理解其创建、属性与操作,是掌握整个框架的第一步。

import torch
import numpy as np

def tensor_basics():
    """PyTorch张量基础"""

    print("=" * 60)
    print("PyTorch张量基础")
    print("=" * 60)

    # 1. 创建张量
    print("\n1. 创建张量:")

    # 标量 (0维张量)
    scalar = torch.tensor(42)
    print(f"标量: {scalar}, 形状: {scalar.shape}, 数据类型: {scalar.dtype}")

    # 向量 (1维张量)
    vector = torch.tensor([1, 2, 3, 4, 5])
    print(f"向量: {vector}, 形状: {vector.shape}")

    # 矩阵 (2维张量)
    matrix = torch.tensor([[1, 2], [3, 4], [5, 6]])
    print(f"矩阵: {matrix}, 形状: {matrix.shape}")

    # 3维张量
    tensor_3d = torch.tensor([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
    print(f"3维张量: 形状: {tensor_3d.shape}")

    # 2. 特殊张量
    print("\n2. 特殊张量:")

    # 零张量
    zeros = torch.zeros(2, 3)
    print(f"零张量:\n{zeros}")

    # 一张量
    ones = torch.ones(3, 2)
    print(f"一张量:\n{ones}")

    # 单位矩阵
    eye = torch.eye(3)
    print(f"单位矩阵:\n{eye}")

    # 随机张量
    random_normal = torch.randn(2, 2)  # 标准正态分布
    print(f"正态分布随机张量:\n{random_normal}")

    random_uniform = torch.rand(2, 2)  # 均匀分布[0, 1)
    print(f"均匀分布随机张量:\n{random_uniform}")

    # 3. 张量属性
    print("\n3. 张量属性:")

    tensor = torch.tensor([[1, 2, 3], [4, 5, 6]], dtype=torch.float32)
    print(f"张量:\n{tensor}")
    print(f"形状: {tensor.shape}")
    print(f"数据类型: {tensor.dtype}")
    print(f"维度数: {tensor.dim()}")
    print(f"元素总数: {tensor.numel()}")
    print(f"设备: {tensor.device}")
    print(f"是否requires_grad: {tensor.requires_grad}")
    print(f"转换为NumPy:\n{tensor.numpy()}")

    # 4. 张量操作
    print("\n4. 张量操作:")

    a = torch.tensor([[1, 2], [3, 4]], dtype=torch.float32)
    b = torch.tensor([[5, 6], [7, 8]], dtype=torch.float32)

    print(f"加法:\n{a + b}")
    print(f"乘法:\n{a * b}")
    print(f"矩阵乘法:\n{torch.matmul(a, b)}")
    print(f"点积:\n{torch.dot(a.flatten(), b.flatten())}")

    # 重塑
    original = torch.tensor([1, 2, 3, 4, 5, 6])
    reshaped = original.view(2, 3)
    print(f"重塑前: {original.shape}, 重塑后: {reshaped.shape}")

    # 转置
    transposed = reshaped.t()
    print(f"转置: {transposed.shape}")

    # 拼接
    concat = torch.cat([a, b], dim=0)
    print(f"拼接 (dim=0):\n{concat}")

    # 5. 广播
    print("\n5. 广播机制:")

    x = torch.tensor([1, 2, 3])
    y = torch.tensor([[10], [20], [30]])

    print(f"x: {x.shape}, y: {y.shape}")
    print(f"x + y:\n{x + y}")

    # 6. 张量与NumPy互操作
    print("\n6. 张量与NumPy互操作:")

    # PyTorch张量 -> NumPy数组
    torch_tensor = torch.randn(3, 3)
    numpy_array = torch_tensor.numpy()
    print(f"PyTorch张量 -> NumPy数组:\n{numpy_array}")

    # NumPy数组 -> PyTorch张量
    numpy_array = np.random.randn(3, 3)
    torch_tensor = torch.from_numpy(numpy_array)
    print(f"NumPy数组 -> PyTorch张量:\n{torch_tensor}")

    # 注意:共享内存!
    print(f"共享内存: {torch_tensor.data_ptr() == torch.from_numpy(numpy_array).data_ptr()}")

    return tensor

tensor_basics()

⚠️ 关键细节

  • torch.tensor() 默认不共享内存;torch.from_numpy() 默认共享内存(修改一方会影响另一方);
  • view() 要求张量内存连续,否则报错;reshape() 更鲁棒,会自动拷贝;
  • .item() 仅适用于单元素张量,用于提取 Python 标量值。

2. 自动微分(Autograd)

PyTorch 的 autograd 是其“动态图”灵魂所在。它让反向传播完全隐式、无需手动推导公式。

def autograd_demo():
    """PyTorch自动微分演示"""

    print("=" * 80)
    print("PyTorch自动微分 (Autograd)")
    print("=" * 80)

    # 1. 基本自动微分
    print("\n1. 基本自动微分:")

    basic_code = '''# 创建需要梯度的张量
x = torch.tensor(2.0, requires_grad=True)
y = torch.tensor(3.0, requires_grad=True)

# 定义计算
z = x**2 + y**3 + x*y

print(f"x = {x.item()}, y = {y.item()}")
print(f"z = x^2 + y^3 + x*y = {z.item()}")

# 计算梯度
z.backward()

print(f"∂z/∂x = {x.grad.item()}")  # 2x + y = 2*2 + 3 = 7
print(f"∂z/∂y = {y.grad.item()}")  # 3y^2 + x = 3*9 + 2 = 29

# 验证
print(f"验证 ∂z/∂x: 2*{x.item()} + {y.item()} = {2*x.item() + y.item()}")
print(f"验证 ∂z/∂y: 3*{y.item()}^2 + {x.item()} = {3*y.item()**2 + x.item()}")
'''

    print(basic_code)

    # 2. 计算图可视化
    print("\n2. 计算图可视化:")

    graph_code = '''# 创建更复杂的计算图
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
w = torch.tensor([0.5, 0.3, 0.2], requires_grad=True)
b = torch.tensor(1.0, requires_grad=True)

# 线性变换
y = torch.sum(x * w) + b

# 激活函数
z = torch.sigmoid(y)

print(f"输入 x: {x}")
print(f"权重 w: {w}")
print(f"偏置 b: {b}")
print(f"线性输出 y = Σ(x*w) + b = {y.item()}")
print(f"Sigmoid输出 z = σ(y) = {z.item()}")

# 计算梯度
z.backward()

print(f"\\n梯度:")
print(f"∂z/∂x = {x.grad}")
print(f"∂z/∂w = {w.grad}")
print(f"∂z/∂b = {b.grad}")

# 手动验证梯度
sigmoid_y = torch.sigmoid(y)
dz_dy = sigmoid_y * (1 - sigmoid_y)  # σ'(y) = σ(y)(1-σ(y))
print(f"\\n手动验证:")
print(f"∂z/∂y = σ(y)(1-σ(y)) = {dz_dy.item()}")
print(f"∂z/∂x = ∂z/∂y * ∂y/∂x = {dz_dy.item()} * w = {dz_dy.item() * w}")
'''

    print(graph_code)

    # 3. 梯度控制
    print("\n3. 梯度控制:")

    control_code = '''# 1. 禁用梯度计算
print("禁用梯度计算:")
with torch.no_grad():
    x = torch.tensor([1.0, 2.0, 3.0])
    y = x * 2
    print(f"x = {x}")
    print(f"y = x * 2 = {y}")
    print(f"y.requires_grad = {y.requires_grad}")

# 2. 分离张量(从计算图中分离)
print("\\n分离张量:")
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x * 2
z = y.detach() * 3  # y.detach() 返回不需要梯度的新张量

print(f"x.requires_grad = {x.requires_grad}")
print(f"y.requires_grad = {y.requires_grad}")
print(f"z.requires_grad = {z.requires_grad}")

# 3. 梯度累积
print("\\n梯度累积:")
x = torch.tensor(2.0, requires_grad=True)

# 多次前向传播,累积梯度
for i in range(3):
    y = x ** 2
    y.backward(retain_graph=True)  # 保留计算图
    print(f"第 {i+1} 次反向传播,梯度: {x.grad.item()}")

# 清空梯度
x.grad.zero_()
print(f"清空梯度后: {x.grad.item()}")

# 4. 自定义梯度函数
print("\\n自定义梯度函数:")

class MyReLU(torch.autograd.Function):
    """自定义ReLU函数,带自定义反向传播"""

    @staticmethod
    def forward(ctx, input):
        ctx.save_for_backward(input)  # 保存输入用于反向传播
        return input.clamp(min=0)

    @staticmethod
    def backward(ctx, grad_output):
        input, = ctx.saved_tensors
        grad_input = grad_output.clone()
        grad_input[input < 0] = 0  # ReLU的梯度
        return grad_input

# 使用自定义函数
x = torch.tensor([-1.0, 0.5, 2.0, -0.3], requires_grad=True)
y = MyReLU.apply(x)

print(f"输入: {x}")
print(f"MyReLU输出: {y}")

y.sum().backward()
print(f"梯度: {x.grad}")
'''

    print(control_code)

    # 4. 高阶导数
    print("\n4. 高阶导数:")

    high_order_code = '''# 计算二阶导数
x = torch.tensor(3.0, requires_grad=True)
y = x**3 + 2*x**2 + x + 1

# 一阶导数
first_derivative = torch.autograd.grad(y, x, create_graph=True)[0]
print(f"y = x^3 + 2x^2 + x + 1")
print(f"x = {x.item()}")
print(f"y = {y.item()}")
print(f"dy/dx = {first_derivative.item()}")

# 二阶导数
second_derivative = torch.autograd.grad(first_derivative, x)[0]
print(f"d²y/dx² = {second_derivative.item()}")

# 验证
print(f"\\n验证:")
print(f"dy/dx = 3x^2 + 4x + 1 = {3*x.item()**2 + 4*x.item() + 1}")
print(f"d²y/dx² = 6x + 4 = {6*x.item() + 4}")
'''

    print(high_order_code)

    # 5. 实战示例:线性回归梯度
    print("\n5. 实战示例:线性回归梯度:")

    regression_code = '''# 线性回归的梯度计算
torch.manual_seed(42)

# 生成数据
n_samples = 100
X = torch.randn(n_samples, 1)  # 特征
true_w = 2.5
true_b = 1.0
y = true_w * X + true_b + torch.randn(n_samples, 1) * 0.1  # 添加噪声

# 初始化参数
w = torch.tensor(0.0, requires_grad=True)
b = torch.tensor(0.0, requires_grad=True)

# 学习率
learning_rate = 0.01

# 训练循环(手动梯度下降)
print("梯度下降训练线性回归:")
for epoch in range(100):
    # 前向传播
    y_pred = w * X + b

    # 计算损失(均方误差)
    loss = torch.mean((y_pred - y) ** 2)

    # 反向传播
    loss.backward()

    # 更新参数(手动)
    with torch.no_grad():
        w -= learning_rate * w.grad
        b -= learning_rate * b.grad

        # 清空梯度
        w.grad.zero_()
        b.grad.zero_()

    if epoch % 20 == 0:
        print(f"Epoch {epoch:3d}: w = {w.item():.4f}, b = {b.item():.4f}, Loss = {loss.item():.4f}")

print(f"\\n真实参数: w = {true_w}, b = {true_b}")
print(f"学习参数: w = {w.item():.4f}, b = {b.item():.4f}")
'''

    print(regression_code)

    return None

autograd_demo()

🔍 原理精要

  • requires_grad=True 是开启梯度追踪的开关,只有叶子节点(用户创建的张量)才需显式设置;
  • retain_graph=True 防止反向传播后释放计算图,允许多次调用 .backward()
  • torch.no_grad() 是推理阶段的黄金守则,可节省 30%+ 显存并提速。

四、PyTorch神经网络模块

1. torch.nn.Module基础

nn.Module 是所有神经网络的基类。掌握其生命周期(__init__forward)、参数注册与容器使用,是构建任意复杂模型的基础。

def nn_module_basics():
    """PyTorch神经网络模块基础"""

    print("=" * 80)
    print("PyTorch torch.nn 模块")
    print("=" * 80)

    # 1. 基础Module类
    print("\n1. 基础Module类:")

    basic_module_code = '''import torch
import torch.nn as nn
import torch.nn.functional as F

class SimpleNet(nn.Module):
    """简单的全连接神经网络"""

    def __init__(self, input_size=784, hidden_size=128, output_size=10):
        super().__init__()  # 必须调用父类初始化

        # 定义网络层
        self.fc1 = nn.Linear(input_size, hidden_size)  # 全连接层1
        self.fc2 = nn.Linear(hidden_size, hidden_size // 2)  # 全连接层2
        self.fc3 = nn.Linear(hidden_size // 2, output_size)  # 输出层

        # Dropout层
        self.dropout = nn.Dropout(p=0.2)

        # BatchNorm层
        self.batchnorm = nn.BatchNorm1d(hidden_size // 2)

    def forward(self, x):
        """前向传播"""

        # 展平输入(如果是图像)
        x = x.view(x.size(0), -1)

        # 第一层 + ReLU激活 + Dropout
        x = F.relu(self.fc1(x))
        x = self.dropout(x)

        # 第二层 + ReLU激活 + BatchNorm
        x = F.relu(self.fc2(x))
        x = self.batchnorm(x)
        x = self.dropout(x)

        # 输出层(无激活函数,用于分类)
        x = self.fc3(x)

        return x

# 创建模型实例
model = SimpleNet(input_size=784, hidden_size=256, output_size=10)
print("模型结构:")
print(model)

# 前向传播
x = torch.randn(32, 784)  # 批量大小32,特征784
output = model(x)
print(f"\\n输入形状: {x.shape}")
print(f"输出形状: {output.shape}")

# 模型参数
print(f"\\n模型参数数量: {sum(p.numel() for p in model.parameters())}")
print(f"可训练参数数量: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

# 访问特定层
print(f"\\n第一层权重形状: {model.fc1.weight.shape}")
print(f"第一层偏置形状: {model.fc1.bias.shape}")
'''

    print(basic_module_code)

    # 2. 常用层类型
    print("\n2. 常用层类型:")

    layers_code = '''# 常用层类型示例
layers_examples = {
    '线性层': nn.Linear(in_features=10, out_features=5),
    '卷积层': nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3),
    '池化层': nn.MaxPool2d(kernel_size=2, stride=2),
    '循环层': nn.LSTM(input_size=10, hidden_size=20, num_layers=2),
    '批归一化': nn.BatchNorm2d(num_features=16),
    'Dropout': nn.Dropout(p=0.5),
    '嵌入层': nn.Embedding(num_embeddings=1000, embedding_dim=50),
    '转置卷积': nn.ConvTranspose2d(in_channels=16, out_channels=3, kernel_size=3),
    '层归一化': nn.LayerNorm(normalized_shape=128),
    '实例归一化': nn.InstanceNorm2d(num_features=16),
    '组归一化': nn.GroupNorm(num_groups=4, num_channels=16),
    '自适应池化': nn.AdaptiveAvgPool2d(output_size=(7, 7))
}

print("常用神经网络层:")
for name, layer in layers_examples.items():
    print(f"  • {name}: {layer}")
    if hasattr(layer, 'weight'):
        print(f"    权重形状: {layer.weight.shape if hasattr(layer.weight, 'shape') else 'N/A'}")

# 激活函数
activations = {
    'ReLU': nn.ReLU(),
    'LeakyReLU': nn.LeakyReLU(negative_slope=0.01),
    'Sigmoid': nn.Sigmoid(),
    'Tanh': nn.Tanh(),
    'Softmax': nn.Softmax(dim=1),
    'LogSoftmax': nn.LogSoftmax(dim=1),
    'GELU': nn.GELU(),
    'SiLU': nn.SiLU(),  # 也称为Swish
    'Mish': nn.Mish(),
    'ELU': nn.ELU(alpha=1.0)
}

print("\\n常用激活函数:")
for name, activation in activations.items():
    print(f"  • {name}: {activation}")
'''

    print(layers_code)

    # 3. 损失函数
    print("\n3. 损失函数:")

    loss_functions_code = '''# 常用损失函数
loss_functions = {
    '均方误差': nn.MSELoss(),
    '平均绝对误差': nn.L1Loss(),
    '交叉熵损失': nn.CrossEntropyLoss(),
    '二元交叉熵': nn.BCELoss(),
    '带logits的二元交叉熵': nn.BCEWithLogitsLoss(),
    '负对数似然': nn.NLLLoss(),
    'KL散度': nn.KLDivLoss(),
    'Huber损失': nn.SmoothL1Loss(),
    '余弦相似度': nn.CosineEmbeddingLoss(),
    'MarginRanking损失': nn.MarginRankingLoss(),
    '多标签Margin损失': nn.MultiLabelMarginLoss(),
    'Triplet损失': nn.TripletMarginLoss(),
    'CTCLoss': nn.CTCLoss()  # 连接主义时序分类
}

print("常用损失函数:")
for name, loss_fn in loss_functions.items():
    print(f"  • {name}: {loss_fn}")

# 损失函数使用示例
print("\\n损失函数使用示例:")

# 分类问题
criterion_ce = nn.CrossEntropyLoss()
outputs = torch.randn(4, 3)  # 4个样本,3个类别
targets = torch.tensor([0, 2, 1, 0])  # 真实标签
loss = criterion_ce(outputs, targets)
print(f"交叉熵损失: {loss.item():.4f}")

# 回归问题
criterion_mse = nn.MSELoss()
predictions = torch.tensor([1.2, 2.3, 3.4])
targets_reg = torch.tensor([1.0, 2.0, 3.0])
loss_mse = criterion_mse(predictions, targets_reg)
print(f"均方误差损失: {loss_mse.item():.4f}")
'''

    print(loss_functions_code)

    # 4. 优化器
    print("\n4. 优化器:")

    optimizers_code = '''import torch.optim as optim

# 常用优化器
optimizers = {
    'SGD': optim.SGD,
    'Momentum': lambda params: optim.SGD(params, momentum=0.9),
    'Adam': optim.Adam,
    'AdamW': optim.AdamW,
    'RMSprop': optim.RMSprop,
    'Adagrad': optim.Adagrad,
    'Adadelta': optim.Adadelta,
    'Adamax': optim.Adamax,
    'NAdam': optim.NAdam,
    'RAdam': optim.RAdam
}

print("常用优化器:")
for name, optimizer_class in optimizers.items():
    print(f"  • {name}")

# 优化器使用示例

# 创建简单模型
model = nn.Sequential(
    nn.Linear(10, 20),
    nn.ReLU(),
    nn.Linear(20, 1)
)

# 创建优化器
optimizer_adam = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999))
optimizer_sgd = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

print(f"\\nAdam优化器参数:")
for param_group in optimizer_adam.param_groups:
    print(f"  学习率: {param_group['lr']}")
    print(f"  betas: {param_group['betas']}")

print(f"\\nSGD优化器参数:")
for param_group in optimizer_sgd.param_groups:
    print(f"  学习率: {param_group['lr']}")
    print(f"  动量: {param_group['momentum']}")

# 学习率调度器
schedulers = {
    'StepLR': optim.lr_scheduler.StepLR(optimizer_adam, step_size=30, gamma=0.1),
    'MultiStepLR': optim.lr.scheduler.MultiStepLR(optimizer_adam, milestones=[30, 80], gamma=0.1),
    'ExponentialLR': optim.lr_scheduler.ExponentialLR(optimizer_adam, gamma=0.95),
    'CosineAnnealingLR': optim.lr_scheduler.CosineAnnealingLR(optimizer_adam, T_max=100),
    'ReduceLROnPlateau': optim.lr_scheduler.ReduceLROnPlateau(optimizer_adam, mode='min', patience=5),
    'CyclicLR': optim.lr_scheduler.CyclicLR(optimizer_adam, base_lr=0.001, max_lr=0.01, step_size_up=20)
}

print("\\n学习率调度器:")
for name, scheduler in schedulers.items():
    print(f"  • {name}: {scheduler}")
'''

    print(optimizers_code)

    # 5. 容器模块
    print("\n5. 容器模块:")

    containers_code = '''# 容器模块:组合多个模块
print("容器模块示例:")

# 1. Sequential - 顺序容器
sequential_model = nn.Sequential(
    nn.Conv2d(3, 16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Conv2d(16, 32, kernel_size=3, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Flatten(),
    nn.Linear(32 * 8 * 8, 128),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(128, 10)
)

print("Sequential模型:")
print(sequential_model)

# 测试Sequential模型
x = torch.randn(4, 3, 32, 32)  # 4个样本,3通道,32x32
output = sequential_model(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")

# 2. ModuleList - 模块列表
class ModuleListNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(10, 20),
            nn.Linear(20, 30),
            nn.Linear(30, 40)
        ])
        self.activations = nn.ModuleList([
            nn.ReLU(),
            nn.ReLU(),
            nn.Sigmoid()
        ])

    def forward(self, x):
        for layer, activation in zip(self.layers, self.activations):
            x = activation(layer(x))
        return x

modulelist_model = ModuleListNet()
print(f"\\nModuleList模型参数数量: {sum(p.numel() for p in modulelist_model.parameters())}")

# 3. ModuleDict - 模块字典
class ModuleDictNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.operations = nn.ModuleDict({
            'conv1': nn.Conv2d(3, 16, 3),
            'conv2': nn.Conv2d(16, 32, 3),
            'pool': nn.MaxPool2d(2),
            'fc': nn.Linear(32 * 14 * 14, 10)
        })

    def forward(self, x):
        x = self.operations['conv1'](x)
        x = self.operations['pool'](x)
        x = self.operations['conv2'](x)
        x = self.operations['pool'](x)
        x = x.view(x.size(0), -1)
        x = self.operations['fc'](x)
        return x

moduledict_model = ModuleDictNet()
print(f"ModuleDict模型参数数量: {sum(p.numel() for p in moduledict_model.parameters())}")

# 4. 参数容器
class ParameterNet(nn.Module):
    def __init__(self):
        super().__init__()
        # 参数张量(自动注册为参数)
        self.weight = nn.Parameter(torch.randn(10, 20))
        self.bias = nn.Parameter(torch.zeros(20))

        # ParameterList
        self.weights_list = nn.ParameterList([
            nn.Parameter(torch.randn(10, 10)),
            nn.Parameter(torch.randn(10, 10))
        ])

        # ParameterDict
        self.weights_dict = nn.ParameterDict({
            'weight1': nn.Parameter(torch.randn(10, 10)),
            'weight2': nn.Parameter(torch.randn(10, 10))
        })

    def forward(self, x):
        return x @ self.weight + self.bias

param_model = ParameterNet()
print(f"\\nParameter模型参数数量: {sum(p.numel() for p in param_model.parameters())}")
'''

    print(containers_code)

    return None

nn_module_basics()

🧠 设计哲学

  • nn.Sequential 适合线性流程,但无法表达分支、循环等复杂拓扑;
  • nn.ModuleList / nn.ModuleDict 是动态网络(如 NAS、动态路由)的基石;
  • nn.Parameter 显式声明可学习参数,是实现自定义层的核心。

2. 数据加载与处理

高效的数据管道是训练稳定性的保障。torch.utils.data 提供了灵活、可扩展的抽象。

def data_loading_pipeline():
    """PyTorch数据加载与处理"""

    print("=" * 80)
    print("PyTorch数据加载与处理")
    print("=" * 80)

    # 1. 基础Dataset和DataLoader
    print("\n1. 基础Dataset和DataLoader:")

    basic_dataloader_code = '''import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
import numpy as np

# 1.1 使用TensorDataset(简单情况)
print("TensorDataset示例:")

# 创建模拟数据
x = torch.randn(1000, 10)  # 1000个样本,10个特征
y = torch.randint(0, 2, (1000, 1)).float()  # 二分类标签

# 创建TensorDataset
dataset = TensorDataset(x, y)
print(f"数据集大小: {len(dataset)}")
print(f"样本形状: {dataset[0][0].shape}, 标签形状: {dataset[0][1].shape}")

# 创建DataLoader
dataloader = DataLoader(
    dataset,
    batch_size=32,
    shuffle=True,
    num_workers=2,  # 并行加载数据的工作进程数
    pin_memory=True  # 如果使用GPU,可以加速数据传输
)

print(f"DataLoader批次数量: {len(dataloader)}")

# 遍历DataLoader
for batch_idx, (batch_x, batch_y) in enumerate(dataloader):
    if batch_idx == 0:
        print(f"第一个批次: 输入形状: {batch_x.shape}, 标签形状: {batch_y.shape}")
    break

# 1.2 自定义Dataset
print("\\n自定义Dataset示例:")

class CustomDataset(Dataset):
    """自定义数据集类"""

    def __init__(self, data_path, transform=None):
        """
        初始化数据集

        Args:
            data_path: 数据路径
            transform: 数据转换函数
        """
        # 这里应该加载数据
        # 为了示例,我们生成随机数据
        self.data = torch.randn(1000, 3, 32, 32)  # 1000张32x32 RGB图像
        self.labels = torch.randint(0, 10, (1000,))  # 10个类别
        self.transform = transform

    def __len__(self):
        """返回数据集大小"""
        return len(self.data)

    def __getitem__(self, idx):
        """获取单个样本"""

        image = self.data[idx]
        label = self.labels[idx]

        # 应用变换(如果有)
        if self.transform:
            image = self.transform(image)

        return image, label

# 创建自定义数据集
custom_dataset = CustomDataset("fake_path")
print(f"自定义数据集大小: {len(custom_dataset)}")

# 2. 数据变换(Transforms)
print("\\n2. 数据变换(Transforms):")

import torchvision.transforms as transforms

# 图像数据变换
image_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # 调整大小
    transforms.RandomHorizontalFlip(p=0.5),  # 随机水平翻转
    transforms.RandomRotation(degrees=15),  # 随机旋转
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # 颜色抖动
    transforms.ToTensor(),  # 转换为张量
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # 归一化(ImageNet统计)
                         std=[0.229, 0.224, 0.225])
])

print("图像变换管道:")
for transform in image_transforms.transforms:
    print(f"  • {transform}")

# 文本数据变换示例
print("\\n文本数据变换示例:")
text_transforms = transforms.Compose([
    # 在实际中,这里可能是tokenizer
    lambda x: x.lower(),  # 转换为小写
    lambda x: x.strip(),  # 去除空白
    # 添加更多文本处理步骤...
])

# 3. torchvision数据集
print("\\n3. torchvision内置数据集:")

# 注意:实际使用时需要下载数据
try:
    import torchvision
    from torchvision import datasets

    # MNIST数据集示例
    mnist_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])

    # 训练集
    mnist_train = datasets.MNIST(
        root='./data',
        train=True,
        download=True,  # 如果数据不存在则下载
        transform=mnist_transform
    )

    # 测试集
    mnist_test = datasets.MNIST(
        root='./data',
        train=False,
        download=True,
        transform=mnist_transform
    )

    print(f"MNIST训练集大小: {len(mnist_train)}")
    print(f"MNIST测试集大小: {len(mnist_test)}")

    # 其他常用数据集
    datasets_list = [
        ('CIFAR-10', datasets.CIFAR10),
        ('CIFAR-100', datasets.CIFAR100),
        ('ImageNet', datasets.ImageNet),
        ('FashionMNIST', datasets.FashionMNIST),
        ('COCO', datasets.CocoDetection),
        ('VOC', datasets.VOCDetection)
    ]

    print("\\n其他torchvision数据集:")
    for name, dataset_class in datasets_list:
        print(f"  • {name}")

except ImportError:
    print("torchvision不可用,请安装: pip install torchvision")
'''

    print(basic_dataloader_code)

    # 2. 高级数据加载技巧
    print("\n2. 高级数据加载技巧:")

    advanced_dataloader_code = '''# 1. 自定义collate_fn
print("自定义collate_fn示例:")

def custom_collate_fn(batch):
    """自定义批次处理函数"""

    # batch是列表,元素为(__getitem__返回的样本, 标签)
    images = []
    labels = []
    additional_info = []

    for item in batch:
        # 假设每个样本返回 (image, label, info)
        image, label, info = item
        images.append(image)
        labels.append(label)
        additional_info.append(info)

    # 堆叠张量
    images = torch.stack(images, dim=0)
    labels = torch.tensor(labels)

    return images, labels, additional_info

# 2. 采样器(Sampler)
from torch.utils.data import Sampler, RandomSampler, SequentialSampler, WeightedRandomSampler

print("\\n采样器示例:")

# 随机采样器
random_sampler = RandomSampler(mnist_train)
print(f"随机采样器: {random_sampler}")

# 顺序采样器
sequential_sampler = SequentialSampler(mnist_train)
print(f"顺序采样器: {sequential_sampler}")

# 加权随机采样器(用于处理类别不平衡)
weights = [1.0] * len(mnist_train)  # 这里应该是每个样本的权重
weighted_sampler = WeightedRandomSampler(weights, num_samples=1000, replacement=True)
print(f"加权随机采样器: {weighted_sampler}")

# 3. 批采样器(BatchSampler)
from torch.utils.data import BatchSampler

batch_sampler = BatchSampler(random_sampler, batch_size=32, drop_last=False)
print(f"批采样器: {batch_sampler}")

# 4. 分布式采样器
from torch.utils.data.distributed import DistributedSampler

# 分布式采样器(用于多GPU训练)
# distributed_sampler = DistributedSampler(mnist_train)
# print(f"分布式采样器: {distributed_sampler}")

# 5. DataLoader高级参数
advanced_dataloader = DataLoader(
    mnist_train,
    batch_size=64,
    shuffle=True,
    num_workers=4,  # 并行工作进程数
    pin_memory=True,  # 加速GPU数据传输
    prefetch_factor=2,  # 预取因子
    persistent_workers=True,  # 保持工作进程活跃
    drop_last=True  # 丢弃最后一个不完整的批次
)

print(f"\\nDataLoader高级参数:")
print(f"  batch_size: {advanced_dataloader.batch_size}")
print(f"  num_workers: {advanced_dataloader.num_workers}")
print(f"  pin_memory: {advanced_dataloader.pin_memory}")
print(f"  drop_last: {advanced_dataloader.drop_last}")

# 6. 迭代DataLoader的多种方式
print("\\n迭代DataLoader的多种方式:")

# 方式1: 直接迭代
print("方式1: 直接迭代")
for batch in advanced_dataloader:
    images, labels = batch
    print(f"批次形状: {images.shape}")
    break

# 方式2: 使用enumerate获取批次索引
print("\\n方式2: 使用enumerate")
for batch_idx, (images, labels) in enumerate(advanced_dataloader):
    if batch_idx == 0:
        print(f"批次 {batch_idx}: {images.shape}")
    break

# 方式3: 使用tqdm显示进度条
print("\\n方式3: 使用tqdm进度条")
try:
    from tqdm import tqdm

    # 创建进度条
    pbar = tqdm(advanced_dataloader, desc="训练", total=len(advanced_dataloader))

    for images, labels in pbar:
        # 在这里进行训练
        # pbar.set_description(f"训练 (Loss: {loss:.4f})")
        break

    print("使用tqdm显示进度条")

except ImportError:
    print("tqdm未安装,使用: pip install tqdm")
'''

    print(advanced_dataloader_code)

    # 3. 实战示例:完整数据管道
    print("\n3. 实战示例:完整数据管道:")

    practical_pipeline_code = '''import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from PIL import Image
import os

class ImageClassificationDataset(Dataset):
    """图像分类数据集"""

    def __init__(self, root_dir, transform=None, split='train'):
        """
        初始化数据集

        Args:
            root_dir: 数据根目录
            transform: 数据变换
            split: 'train' 或 'val'
        """
        self.root_dir = root_dir
        self.transform = transform
        self.split = split

        # 组织数据
        self.image_paths = []
        self.labels = []
        self.class_to_idx = {}

        # 假设目录结构为:
        # root_dir/
        #   train/
        #       class1/
        #           img1.jpg
        #           img2.jpg
        #       class2/
        #           img1.jpg
        #   val/
        #       class1/
        #       class2/

        split_dir = os.path.join(root_dir, split)

        if os.path.exists(split_dir):
            # 获取类别
            classes = sorted(os.listdir(split_dir))
            self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(classes)}

            # 遍历所有类别
            for class_name in classes:
                class_dir = os.path.join(split_dir, class_name)
                class_idx = self.class_to_idx[class_name]

                # 遍历类别的所有图像
                for img_name in os.listdir(class_dir):
                    if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                        img_path = os.path.join(class_dir, img_name)
                        self.image_paths.append(img_path)
                        self.labels.append(class_idx)
        else:
            print(f"警告: 目录 {split_dir} 不存在")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]

        # 加载图像
        try:
            image = Image.open(img_path).convert('RGB')
        except Exception as e:
            print(f"无法加载图像 {img_path}: {e}")
            # 返回占位符
            image = Image.new('RGB', (224, 224), color='white')

        # 应用变换
        if self.transform:
            image = self.transform(image)

        return image, label

    def get_class_distribution(self):
        """获取类别分布"""
        from collections import Counter
        return Counter(self.labels)

# 数据增强变换
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# 验证集变换(不需要数据增强)
val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

def create_data_loaders(data_dir, batch_size=32, num_workers=4):
    """创建数据加载器"""

    # 创建数据集
    train_dataset = ImageClassificationDataset(
        root_dir=data_dir,
        transform=train_transform,
        split='train'
    )

    val_dataset = ImageClassificationDataset(
        root_dir=data_dir,
        transform=val_transform,
        split='val'
    )

    print(f"训练集大小: {len(train_dataset)}")
    print(f"验证集大小: {len(val_dataset)}")
    print(f"类别数量: {len(train_dataset.class_to_idx)}")

    # 创建数据加载器
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True,
        drop_last=True
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )

    return train_loader, val_loader, train_dataset.class_to_idx

# 使用示例
print("完整数据管道示例:")
print("=" * 50)

# 注意:这里使用虚拟目录,实际使用时需要真实数据
# train_loader, val_loader, class_to_idx = create_data_loaders('./data')

print("数据管道创建完成!")
print("包含以下功能:")
print("1. 从目录结构自动加载图像和标签")
print("2. 训练集和验证集不同的数据增强")
print("3. 支持多进程数据加载")
print("4. 自动类别映射")
print("5. 错误处理(损坏图像)")

# 4. 数据缓存和性能优化
print("\\n4. 数据缓存和性能优化:")

class CachedDataset(Dataset):
    """带缓存的数据集"""

    def __init__(self, dataset, cache_size=1000):
        self.dataset = dataset
        self.cache = {}
        self.cache_size = cache_size

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]

        item = self.dataset[idx]

        # 如果缓存未满,添加到缓存
        if len(self.cache) < self.cache_size:
            self.cache[idx] = item

        return item

print("带缓存的数据集可以加速数据加载,特别是当数据预处理很耗时的时候")
'''

    print(practical_pipeline_code)

    return None

data_loading_pipeline()

性能提示

  • num_workers > 0 可显著提升吞吐,但过多会导致内存暴涨,建议设为 min(4, cpu_count)
  • pin_memory=True + non_blocking=True(在 .to(device, non_blocking=True) 中)是 GPU 加速的关键组合;
  • prefetch_factor=2 表示每个 worker 预取 2 个 batch,有效掩盖 I/O 延迟。

五、PyTorch训练与验证

1. 基础训练循环

一个健壮的训练循环需兼顾正确性、可观测性与容错性。以下提供从零开始的完整实现:

def basic_training_loop():
    """PyTorch基础训练循环"""

    print("=" * 80)
    print("PyTorch基础训练循环")
    print("=" * 80)

    # 1. 完整训练示例
    print("\n1. 完整训练示例:")

    training_code = '''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split

# 设置随机种子以确保可复现性
torch.manual_seed(42)
np.random.seed(42)

# 1. 准备数据
def prepare_data():
    """准备模拟数据"""

    n_samples = 1000
    n_features = 20
    n_classes = 3

    # 生成数据
    X = np.random.randn(n_samples, n_features).astype(np.float32)
    y = np.random.randint(0, n_classes, n_samples).astype(np.int64)

    # 划分训练集和验证集
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # 转换为PyTorch张量
    X_train_tensor = torch.from_numpy(X_train)
    y_train_tensor = torch.from_numpy(y_train)
    X_val_tensor = torch.from_numpy(X_val)
    y_val_tensor = torch.from_numpy(y_val)

    # 创建数据集和数据加载器
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    return train_loader, val_loader, n_features, n_classes

# 2. 定义模型
class SimpleClassifier(nn.Module):
    """简单的分类器"""

    def __init__(self, input_size, hidden_size, num_classes):
        super().__init__()

        self.model = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.BatchNorm1d(hidden_size),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.BatchNorm1d(hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_size // 2, num_classes)
        )

    def forward(self, x):
        return self.model(x)

# 3. 训练函数
def train_epoch(model, dataloader, criterion, optimizer, device):
    """训练一个epoch"""

    model.train()  # 设置为训练模式
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (inputs, targets) in enumerate(dataloader):
        # 移动到设备
        inputs, targets = inputs.to(device), targets.to(device)

        # 清零梯度
        optimizer.zero_grad()

        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # 反向传播
        loss.backward()

        # 梯度裁剪(防止梯度爆炸)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # 更新参数
        optimizer.step()

        # 统计
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

        # 打印进度
        if batch_idx % 10 == 0:
            print(f'  Batch {batch_idx}/{len(dataloader)}: Loss={loss.item():.4f}')

    epoch_loss = running_loss / len(dataloader)
    epoch_acc = 100. * correct / total

    return epoch_loss, epoch_acc

# 4. 验证函数
def validate_epoch(model, dataloader, criterion, device):
    """验证一个epoch"""

    model.eval()  # 设置为评估模式
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():  # 禁用梯度计算
        for inputs, targets in dataloader:
            # 移动到设备
            inputs, targets = inputs.to(device), targets.to(device)

            # 前向传播
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            # 统计
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    epoch_loss = running_loss / len(dataloader)
    epoch_acc = 100. * correct / total

    return epoch_loss, epoch_acc

# 5. 主训练循环
def main():
    """主训练函数"""

    print("开始训练...")
    print("=" * 50)

    # 配置
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"使用设备: {device}")

    # 准备数据
    train_loader, val_loader, n_features, n_classes = prepare_data()

    # 创建模型
    model = SimpleClassifier(
        input_size=n_features,
        hidden_size=128,
        num_classes=n_classes
    ).to(device)

    # 损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

    # 学习率调度器
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

    # 训练参数
    num_epochs = 20
    best_val_acc = 0.0

    # 记录训练历史
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': []
    }

    # 训练循环
    for epoch in range(num_epochs):
        print(f"\\nEpoch {epoch+1}/{num_epochs}")
        print("-" * 30)

        # 训练
        train_loss, train_acc = train_epoch(
            model, train_loader, criterion, optimizer, device
        )

        # 验证
        val_loss, val_acc = validate_epoch(
            model, val_loader, criterion, device
        )

        # 更新学习率
        scheduler.step()

        # 记录历史
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        # 打印结果
        print(f"训练结果: 损失={train_loss:.4f}, 准确率={train_acc:.2f}%")
        print(f"验证结果: 损失={val_loss:.4f}, 准确率={val_acc:.2f}%")
        print(f"学习率: {optimizer.param_groups[0]['lr']:.6f}")

        # 保存最佳模型
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_acc': val_acc,
            }, 'best_model.pth')
            print(f"保存最佳模型,验证准确率: {val_acc:.2f}%")

    print("\\n训练完成!")
    print(f"最佳验证准确率: {best_val_acc:.2f}%")

    return model, history

# 运行训练
model, history = main()
'''

    print(training_code)

    # 2. 高级训练技巧
    print("\n2. 高级训练技巧:")

    advanced_training_code = '''import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler

class AdvancedTrainer:
    """高级训练器"""

    def __init__(self, model, device, use_amp=True):
        self.model = model
        self.device = device
        self.use_amp = use_amp

        # 自动混合精度训练
        if use_amp:
            self.scaler = GradScaler()
        else:
            self.scaler = None

        # 指标追踪
        self.metrics = {
            'train': {'loss': [], 'acc': []},
            'val': {'loss': [], 'acc': []}
        }

    def train_epoch(self, train_loader, criterion, optimizer):
        """训练一个epoch(支持混合精度)"""

        self.model.train()
        total_loss = 0.0
        correct = 0
        total = 0

        for batch_idx, (inputs, targets) in enumerate(train_loader):
            inputs, targets = inputs.to(self.device), targets.to(self.device)

            # 清零梯度
            optimizer.zero_grad()

            # 前向传播(支持混合精度)
            if self.use_amp:
                with autocast():
                    outputs = self.model(inputs)
                    loss = criterion(outputs, targets)

                # 反向传播(缩放梯度)
                self.scaler.scale(loss).backward()

                # 取消缩放梯度并更新参数
                self.scaler.step(optimizer)
                self.scaler.update()
            else:
                outputs = self.model(inputs)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()

            # 统计
            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

        epoch_loss = total_loss / len(train_loader)
        epoch_acc = 100. * correct / total

        self.metrics['train']['loss'].append(epoch_loss)
        self.metrics['train']['acc'].append(epoch_acc)

        return epoch_loss, epoch_acc

    def validate_epoch(self, val_loader, criterion):
        """验证一个epoch"""

        self.model.eval()
        total_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)

                outputs = self.model(inputs)
                loss = criterion(outputs, targets)

                total_loss += loss.item()
                _, predicted = outputs.max(1)
                total += targets.size(0)
                correct += predicted.eq(targets).sum().item()

        epoch_loss = total_loss / len(val_loader)
        epoch_acc = 100. * correct / total

        self.metrics['val']['loss'].append(epoch_loss)
        self.metrics['val']['acc'].append(epoch_acc)

        return epoch_loss, epoch_acc

    def train(self, train_loader, val_loader, criterion, optimizer, 
              scheduler=None, num_epochs=10, early_stopping_patience=5):
        """完整训练过程"""

        print(f"开始训练,共 {num_epochs} 个epoch")
        print("使用混合精度训练:", self.use_amp)

        best_val_acc = 0.0
        patience_counter = 0

        for epoch in range(num_epochs):
            print(f"\\nEpoch {epoch+1}/{num_epochs}")
            print("-" * 30)

            # 训练
            train_loss, train_acc = self.train_epoch(train_loader, criterion, optimizer)

            # 验证
            val_loss, val_acc = self.validate_epoch(val_loader, criterion)

            # 更新学习率
            if scheduler:
                scheduler.step(val_loss)

            # 打印结果
            print(f"训练: 损失={train_loss:.4f}, 准确率={train_acc:.2f}%")
            print(f"验证: 损失={val_loss:.4f}, 准确率={val_acc:.2f}%")

            # 早停检查
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                patience_counter = 0

                # 保存最佳模型
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': self.model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'val_acc': val_acc,
                    'metrics': self.metrics
                }, 'best_model_advanced.pth')
                print(f"保存最佳模型,验证准确率: {val_acc:.2f}%")
            else:
                patience_counter += 1
                if patience_counter >= early_stopping_patience:
                    print(f"早停触发,在epoch {epoch+1}停止训练")
                    break

        print("\\n训练完成!")
        print(f"最佳验证准确率: {best_val_acc:.2f}%")

        return self.metrics

# 梯度累积训练
def train_with_gradient_accumulation(model, train_loader, criterion, optimizer, 
                                    device, accumulation_steps=4):
    """梯度累积训练"""

    model.train()
    optimizer.zero_grad()  # 在开始时清零梯度

    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)

        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # 反向传播(累积梯度)
        loss = loss / accumulation_steps  # 缩放损失
        loss.backward()

        # 每accumulation_steps步更新一次参数
        if (batch_idx + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()

            print(f"更新参数,批次: {batch_idx+1}")

    # 处理剩余的梯度
    if len(train_loader) % accumulation_steps != 0:
        optimizer.step()
        optimizer.zero_grad()

print("高级训练技巧:")
print("1. 自动混合精度训练: 加快训练速度,减少显存使用")
print("2. 梯度累积: 模拟大batch size训练")
print("3. 梯度裁剪: 防止梯度爆炸")
print("4. 早停: 防止过拟合")
print("5. 学习率调度: 动态调整学习率")
'''

    print(advanced_training_code)

    # 3. 模型评估与测试
    print("\n3. 模型评估与测试:")

    evaluation_code = '''import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns

class ModelEvaluator:
    """模型评估器"""

    def __init__(self, model, device):
        self.model = model
        self.device = device

    def predict(self, dataloader):
        """批量预测"""

        self.model.eval()
        all_predictions = []
        all_targets = []
        all_probabilities = []

        with torch.no_grad():
            for inputs, targets in dataloader:
                inputs = inputs.to(self.device)

                # 前向传播
                outputs = self.model(inputs)

                # 获取预测结果
                probabilities = torch.softmax(outputs, dim=1)
                _, predictions = outputs.max(1)

                # 收集结果
                all_predictions.extend(predictions.cpu().numpy())
                all_targets.extend(targets.numpy())
                all_probabilities.extend(probabilities.cpu().numpy())

        return {
            'predictions': np.array(all_predictions),
            'targets': np.array(all_targets),
            'probabilities': np.array(all_probabilities)
        }

    def compute_metrics(self, predictions_dict):
        """计算评估指标"""

        y_true = predictions_dict['targets']
        y_pred = predictions_dict['predictions']

        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision_macro': precision_score(y_true, y_pred, average='macro'),
            'recall_macro': recall_score(y_true, y_pred, average='macro'),
            'f1_macro': f1_score(y_true, y_pred, average='macro')
        }

        # 二分类特定指标
        if len(np.unique(y_true)) == 2:
            metrics['precision_binary'] = precision_score(y_true, y_pred)
            metrics['recall_binary'] = recall_score(y_true, y_pred)
            metrics['f1_binary'] = f1_score(y_true, y_pred)

        return metrics

    def plot_confusion_matrix(self, predictions_dict, class_names=None):
        """绘制混淆矩阵"""

        y_true = predictions_dict['targets']
        y_pred = predictions_dict['predictions']

        cm = confusion_matrix(y_true, y_pred)

        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                    xticklabels=class_names, yticklabels=class_names)
        plt.xlabel('预测标签')
        plt.ylabel('真实标签')
        plt.title('混淆矩阵')
        plt.tight_layout()
        plt.show()

        return cm

    def print_classification_report(self, predictions_dict, class_names=None):
        """打印分类报告"""

        y_true = predictions_dict['targets']
        y_pred = predictions_dict['predictions']

        report = classification_report(y_true, y_pred, target_names=class_names)
        print("分类报告:")
        print(report)

        return report

    def plot_roc_curve(self, predictions_dict, num_classes):
        """绘制ROC曲线(多分类)"""

        from sklearn.metrics import roc_curve, auc
        from sklearn.preprocessing import label_binarize

        y_true = predictions_dict['targets']
        y_prob = predictions_dict['probabilities']

        # 二值化标签
        y_true_bin = label_binarize(y_true, classes=range(num_classes))

        # 计算每个类别的ROC曲线和AUC
        fpr = {}
        tpr = {}
        roc_auc = {}

        plt.figure(figsize=(10, 8))

        for i in range(num_classes):
            fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_prob[:, i])
            roc_auc[i] = auc(fpr[i], tpr[i])

            plt.plot(fpr[i], tpr[i], lw=2,
                     label=f'类别 {i} (AUC = {roc_auc[i]:.2f})')

        # 绘制对角线
        plt.plot([0, 1], [0, 1], 'k--', lw=2)

        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('假正率')
        plt.ylabel('真正率')
        plt.title('多分类ROC曲线')
        plt.legend(loc="lower right")
        plt.grid(alpha=0.3)
        plt.show()

        return fpr, tpr, roc_auc

    def plot_training_history(self, history):
        """绘制训练历史"""

        fig, axes = plt.subplots(1, 2, figsize=(12, 4))

        # 损失曲线
        axes[0].plot(history['train']['loss'], label='训练损失')
        axes[0].plot(history['val']['loss'], label='验证损失')
        axes[0].set_xlabel('Epoch')
        axes[0].set_ylabel('损失')
        axes[0].set_title('训练和验证损失')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)

        # 准确率曲线
        axes[1].plot(history['train']['acc'], label='训练准确率')
        axes[1].plot(history['val']['acc'], label='验证准确率')
        axes[1].set_xlabel('Epoch')
        axes[1].set_ylabel('准确率 (%)')
        axes[1].set_title('训练和验证准确率')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

# 使用示例
print("模型评估功能:")
print("1. 批量预测")
print("2. 计算多种评估指标")
print("3. 绘制混淆矩阵")
print("4. 打印分类报告")
print("5. 绘制ROC曲线")
print("6. 可视化训练历史")
'''

    print(evaluation_code)

    return None

basic_training_loop()

工程实践要点

  • clip_grad_norm_() 是防止 RNN/LSTM 训练崩溃的必备项;
  • early_stopping_patience 建议设为 5–10,避免过早终止;
  • history 字典应持久化为 JSON,便于后续分析与可视化。

六、PyTorch模型部署与生产

1. 模型保存与加载

生产环境中,模型的可移植性与版本可控性至关重要。PyTorch 提供了多层级的序列化方案。

def model_saving_loading():
    """PyTorch模型保存与加载"""

    print("=" * 80)
    print("PyTorch模型保存与加载")
    print("=" * 80)

    # 1. 基础保存与加载
    print("\n1. 基础保存与加载:")

    basic_saving_code = '''import torch
import torch.nn as nn
import torch.optim as optim

# 创建示例模型
class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(10, 20)
        self.fc2 = nn.Linear(20, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

model = SimpleModel()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 1. 保存整个模型(不推荐)
print("方法1: 保存整个模型")
torch.save(model, 'model_complete.pth')

# 加载整个模型
loaded_model = torch.load('model_complete.pth')
print(f"加载完整模型: {type(loaded_model)}")

# 2. 保存模型状态字典(推荐)
print("\\n方法2: 保存模型状态字典")
torch.save(model.state_dict(), 'model_state_dict.pth')

# 创建新模型并加载状态字典
new_model = SimpleModel()
new_model.load_state_dict(torch.load('model_state_dict.pth'))
print(f"加载状态字典到新模型: {type(new_model)}")

# 3. 保存检查点(训练中保存)
print("\\n方法3: 保存检查点")
checkpoint = {
    'epoch': 10,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': 0.05,
    'accuracy': 0.95
}
torch.save(checkpoint, 'checkpoint.pth')

# 加载检查点
loaded_checkpoint = torch.load('checkpoint.pth')
model.load_state_dict(loaded_checkpoint['model_state_dict'])
optimizer.load_state_dict(loaded_checkpoint['optimizer_state_dict'])
epoch = loaded_checkpoint['epoch']
print(f"加载检查点: epoch={epoch}, loss={loaded_checkpoint['loss']}, accuracy={loaded_checkpoint['accuracy']}")

# 4. 保存多个模型
print("\\n方法4: 保存多个模型")
ensemble_models = {
    'model1': model.state_dict(),
    'model2': SimpleModel().state_dict(),
    'metadata': {
        'created_date': '2024-01-01',
        'version': '1.0'
    }
}
torch.save(ensemble_models, 'ensemble_models.pth')
'''

    print(basic_saving_code)

    # 2. 跨设备保存与加载
    print("\n2. 跨设备保存与加载:")

    cross_device_code = '''# 跨设备保存与加载
print("跨设备模型处理:")

# 创建GPU模型(如果有GPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_gpu = SimpleModel().to(device)

# 保存GPU模型
torch.save(model_gpu.state_dict(), 'model_gpu.pth')

# 加载到CPU
model_cpu = SimpleModel()
state_dict = torch.load('model_gpu.pth', map_location=torch.device('cpu'))
model_cpu.load_state_dict(state_dict)
print(f"GPU模型加载到CPU: 成功")

# 加载到GPU
if torch.cuda.is_available():
    model_gpu2 = SimpleModel().to(device)
    state_dict = torch.load('model_gpu.pth', map_location=device)
    model_gpu2.load_state_dict(state_dict)
    print(f"GPU模型加载到GPU: 成功")

# 处理不同GPU编号
print("\\n处理不同GPU编号:")

# 保存时指定map_location
def load_model_flexible(model_path, device):
    """灵活加载模型到指定设备"""

    # 尝试加载到指定设备
    try:
        state_dict = torch.load(model_path, map_location=device)
    except:
        # 如果失败,加载到CPU然后移动到设备
        state_dict = torch.load(model_path, map_location='cpu')

    model = SimpleModel()
    model.load_state_dict(state_dict)
    model = model.to(device)

    return model

print("灵活加载模型函数已创建")
'''

    print(cross_device_code)

    # 3. TorchScript和模型序列化
    print("\n3. TorchScript和模型序列化:")

    torchscript_code = '''# TorchScript: 将PyTorch模型转换为可序列化的格式
print("TorchScript模型序列化:")

# 创建示例模型
model = SimpleModel()
model.eval()  # 转换为推理模式

# 方法1: TorchScript Tracing(跟踪)
print("方法1: Tracing")
example_input = torch.randn(1, 10)
traced_model = torch.jit.trace(model, example_input)

# 保存TorchScript模型
traced_model.save('traced_model.pt')
print(f"Tracing模型已保存: traced_model.pt")

# 加载TorchScript模型
loaded_traced = torch.jit.load('traced_model.pt')
output = loaded_traced(example_input)
print(f"加载的Tracing模型输出: {output.shape}")

# 方法2: TorchScript Scripting(脚本)
print("\\n方法2: Scripting")

class SimpleModelWithControlFlow(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(10, 20)
        self.fc2 = nn.Linear(20, 1)
        self.threshold = 0.5

    def forward(self, x):
        x = torch.relu(self.fc1(x))

        # 控制流(Tracing无法处理)
        if x.mean() > self.threshold:
            x = self.fc2(x)
        else:
            x = self.fc2(x) * 0.5

        return x

model_with_control = SimpleModelWithControlFlow()
model_with_control.eval()

# 使用Scripting处理控制流
scripted_model = torch.jit.script(model_with_control)
scripted_model.save('scripted_model.pt')
print(f"Scripting模型已保存: scripted_model.pt")

# 方法3: TorchScript优化
print("\\n方法3: TorchScript优化")

# 融合操作(需要模型在GPU上)
if torch.cuda.is_available():
    model_cuda = SimpleModel().cuda()
    model_cuda.eval()

    # 转换为TorchScript
    traced_cuda = torch.jit.trace(model_cuda, torch.randn(1, 10).cuda())

    # 应用优化
    torch.jit.freeze(traced_cuda)  # 冻结模型
    traced_cuda = torch.jit.optimize_for_inference(traced_cuda)  # 推理优化

    traced_cuda.save('optimized_model.pt')
    print(f"优化模型已保存: optimized_model.pt")

# TorchScript的优点
print("\\nTorchScript的优点:")
print("1. 模型可序列化,不依赖Python代码")
print("2. 可以在C++中加载和运行")
print("3. 可以进行模型优化")
print("4. 支持移动端部署")
'''

    print(torchscript_code)

    # 4. 模型部署选项
    print("\n4. 模型部署选项:")

    deployment_code = '''# PyTorch模型部署选项
print("PyTorch模型部署选项:")

deployment_options = {
    'TorchServe': {
        '描述': 'PyTorch官方模型服务框架',
        '特点': ['REST API', '模型版本管理', '自动批处理', '模型监控'],
        '使用场景': '生产环境模型服务',
        '安装': 'pip install torchserve torch-model-archiver'
    },
    'ONNX Runtime': {
        '描述': '跨框架推理引擎',
        '特点': ['高性能推理', '多硬件支持', '跨平台', '量化支持'],
        '使用场景': '跨框架部署,性能关键应用',
        '转换': 'torch.onnx.export()'
    },
    'TensorRT': {
        '描述': 'NVIDIA高性能推理引擎',
        '特点': ['极致性能', '低延迟', 'INT8/FP16量化', '动态形状'],
        '使用场景': 'NVIDIA GPU上的高性能推理',
        '要求': 'NVIDIA GPU, TensorRT SDK'
    },
    'Torch Mobile': {
        '描述': '移动端部署',
        '特点': ['iOS/Android支持', '模型优化', '离线推理'],
        '使用场景': '移动应用',
        '工具': 'PyTorch Mobile, LibTorch'
    },
    'FastAPI + PyTorch': {
        '描述': '自定义API服务',
        '特点': ['灵活控制', '易于定制', 'Python生态'],
        '使用场景': '快速原型,定制需求',
        '框架': 'FastAPI, Flask'
    }
}

print("部署框架对比:")
for name, info in deployment_options.items():
    print(f"\\n{name}:")
    print(f"  描述: {info['描述']}")
    print(f"  特点: {', '.join(info['特点'])}")
    print(f"  使用场景: {info['使用场景']}")

# ONNX导出示例
print("\\nONNX导出示例:")

def export_to_onnx(model, input_shape, onnx_path='model.onnx'):
    """导出模型为ONNX格式"""

    # 设置模型为评估模式
    model.eval()

    # 创建示例输入
    dummy_input = torch.randn(*input_shape)

    # 导出ONNX
    torch.onnx.export(
        model,                         # 要导出的模型
        dummy_input,                   # 模型输入
        onnx_path,                     # 输出文件路径
        export_params=True,            # 导出参数
        opset_version=13,             # ONNX opset版本
        do_constant_folding=True,      # 常量折叠优化
        input_names=['input'],         # 输入名称
        output_names=['output'],       # 输出名称
        dynamic_axes={                 # 动态轴(批处理维度)
            'input': {0: 'batch_size'},
            'output': {0: 'batch_size'}
        }
    )

    print(f"模型已导出为: {onnx_path}")
    return onnx_path

# TorchServe部署示例
print("\\nTorchServe部署步骤:")
torchserve_steps = '''# 1. 创建模型存档
torch-model-archiver --model-name mymodel \\
                     --version 1.0 \\
                     --model-file model.py \\
                     --serialized-file model.pth \\
                     --handler image_classifier.py \\
                     --extra-files index_to_name.json

# 2. 启动TorchServe
torchserve --start --model-store model_store \\
                  --models mymodel=mymodel.mar \\
                  --ncs

# 3. 调用API
curl http://localhost:8080/predictions/mymodel -T test_image.jpg
'''

print(torchserve_steps)

print("\\n部署建议:")
print("1. 原型开发: FastAPI + PyTorch")
print("2. 生产服务: TorchServe")
print("3. 跨平台: ONNX Runtime")
print("4. 极致性能: TensorRT")
print("5. 移动端: PyTorch Mobile")
'''

    print(deployment_code)

    return None

model_saving_loading()

🚀 选型决策树

  • 若你已有 Kubernetes 集群,且需灰度发布、AB 测试、流量镜像 —— 选 TorchServe
  • 若需部署到边缘设备(Jetson、树莓派)或 Web(WebAssembly)—— 选 ONNX Runtime;
  • 若追求极致吞吐与低延迟(如广告 CTR 预估)—— 用 TensorRT + FP16;
  • 若是 iOS/Android App 内嵌 AI 能力 —— PyTorch Mobile 是最轻量选择。

PyTorch 以其动态计算图和 Pythonic 的设计哲学,成为了深度学习研究和开发的首选框架之一。通过今天的学习,你已经掌握了 PyTorch 的核心概念和基本用法。记住:PyTorch 的强大之处在于其灵活性和直观性,这使得它特别适合研究和快速原型开发。

欢迎访问 云栈社区,获取更多 人工智能智能 & 数据 & 云Python 实战资源。




上一篇:AI疲劳真实存在:工程师效率提升后为何更倦怠?行业深度探讨
下一篇:GitHub项目集锦:精选12个优质开源项目,从服务器管理到安全监控
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-23 13:00 , Processed in 0.738720 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表