一、PyTorch概述与设计哲学
1. PyTorch生态系统全览
PyTorch 不仅是一个张量计算库,更是一个覆盖研究、训练、部署、优化全链路的深度学习生态。其模块化设计让开发者能按需选用组件,既可快速验证想法,也能构建工业级服务。
以下代码可视化了 PyTorch 核心模块及其依赖关系:
import matplotlib.pyplot as plt
def visualize_pytorch_ecosystem():
"""可视化PyTorch生态系统"""
components = {
'PyTorch Core': ['张量计算', '自动微分', 'GPU加速'],
'torch.nn': ['神经网络模块', '损失函数', '优化器'],
'torchvision': ['计算机视觉', '数据集', '预训练模型'],
'torchaudio': ['音频处理', '音频数据集', '音频模型'],
'torchtext': ['文本处理', 'NLP数据集', '文本模型'],
'TorchServe': ['模型部署', '推理服务', '模型管理'],
'TorchScript': ['模型序列化', '跨语言部署', '性能优化'],
'PyTorch Lightning': ['训练抽象', '实验管理', '可复现性'],
'Hugging Face Transformers': ['预训练模型', 'NLP工具', '模型共享'],
'Detectron2': ['目标检测', '实例分割', '姿态估计']
}
# 创建生态系统图
fig, ax = plt.subplots(figsize=(14, 8))
ax.axis('off')
# 设置位置
positions = {
'PyTorch Core': (0.5, 0.8),
'torch.nn': (0.3, 0.7),
'torchvision': (0.2, 0.6),
'torchaudio': (0.4, 0.6),
'torchtext': (0.6, 0.6),
'TorchServe': (0.8, 0.7),
'TorchScript': (0.7, 0.5),
'PyTorch Lightning': (0.3, 0.5),
'Hugging Face Transformers': (0.5, 0.4),
'Detectron2': (0.7, 0.3)
}
# 绘制组件
for component, (x, y) in positions.items():
# 绘制框
box = plt.Rectangle((x-0.08, y-0.03), 0.16, 0.06,
facecolor='#FF6B6B', edgecolor='#C44D58', alpha=0.8)
ax.add_patch(box)
# 添加文本
ax.text(x, y, component, ha='center', va='center',
fontsize=9, fontweight='bold', color='white')
# 添加功能描述
features = components[component]
feature_text = '\n'.join(features)
ax.text(x, y-0.02, feature_text, ha='center', va='top',
fontsize=7, fontstyle='italic')
# 添加连接线
connections = [
('PyTorch Core', 'torch.nn'),
('PyTorch Core', 'torchvision'),
('PyTorch Core', 'torchaudio'),
('PyTorch Core', 'torchtext'),
('PyTorch Core', 'TorchServe'),
('PyTorch Core', 'TorchScript'),
('torch.nn', 'PyTorch Lightning'),
('torchvision', 'Detectron2'),
('torchtext', 'Hugging Face Transformers')
]
for start, end in connections:
x1, y1 = positions[start]
x2, y2 = positions[end]
ax.annotate('', xy=(x2, y2-0.03), xytext=(x1, y1+0.03),
arrowprops=dict(arrowstyle='->', color='gray', alpha=0.6, lw=1.5))
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.set_title('PyTorch生态系统架构', fontsize=16, fontweight='bold', color='#333333')
plt.show()
visualize_pytorch_ecosystem()
该图清晰展示了 PyTorch Core 作为底层引擎,向上支撑各类领域专用库(如 torchvision、torchaudio),并向外延伸出生产部署(TorchServe)、跨平台推理(TorchScript)、工程化封装(Lightning)等关键能力层。
2. PyTorch vs TensorFlow 对比
选择框架不应凭直觉,而应基于项目阶段、团队能力与交付目标。下表从 8 个维度展开对比:
def compare_pytorch_tensorflow():
"""PyTorch与TensorFlow详细对比"""
comparison_data = {
'特性': ['动态计算图', '静态计算图', 'API风格', '调试体验', '研究友好度', '生产部署', '社区生态', '学习曲线'],
'PyTorch': [
'Eager模式为主\n动态构建计算图\n调试方便',
'通过TorchScript\n支持静态图',
'Pythonic\n面向对象\n直观易用',
'优秀的Python\n调试器支持\n实时错误检查',
'非常高\n适合原型开发\n和学术研究',
'良好\nTorchServe支持\n但相对较新',
'快速增长\n研究社区强大\n开源活跃',
'平缓\n符合Python习惯'
],
'TensorFlow': [
'Eager模式支持\n但传统是静态图',
'静态图为主\n优化性能好',
'函数式\n有时冗长\n有学习成本',
'图模式调试\n复杂',
'高\n但传统上更\n适合生产',
'非常成熟\nTF Serving完善\n工具链完整',
'庞大成熟\n工业界广泛使用\n企业支持强',
'较陡峭\n概念较多'
]
}
import pandas as pd
df = pd.DataFrame(comparison_data)
print("PyTorch vs TensorFlow 详细对比")
print("=" * 120)
# 格式化输出
for idx, row in df.iterrows():
print(f"\n{row['特性']}:")
print(f" PyTorch: {row['PyTorch']}")
print(f" TensorFlow: {row['TensorFlow']}")
print("\n" + "=" * 120)
print("\n选择建议:")
print("1. 学术研究/快速原型: 选择 PyTorch")
print("2. 生产部署/企业应用: 考虑 TensorFlow")
print("3. 入门学习: PyTorch 更容易上手")
print("4. 多框架掌握: 学习两者,了解各自优势")
print("5. 最新趋势: 两者都在互相借鉴,差距在缩小")
return df
compare_pytorch_tensorflow()
✅ 关键结论:
- 若你正参与高校课题、AI竞赛或需要频繁修改网络结构(如探索新型注意力机制),PyTorch 是更自然的选择;
- 若你所在团队已建立成熟的 TF Serving 运维体系,且模型迭代频率低、稳定性要求极高,TensorFlow 仍具优势;
- 值得注意的是,PyTorch 2.0+ 的
torch.compile() 和 TorchDynamo 已大幅补齐图优化短板,二者技术边界正持续模糊。
二、PyTorch安装与环境配置
1. 安装指南
安装方式直接影响后续开发效率与 GPU 利用率。推荐优先使用 conda 管理环境,避免 CUDA 版本冲突。
def setup_pytorch_environment():
"""PyTorch环境配置指南"""
setups = {
'基础安装 (CPU版本)': {
'命令': 'pip install torch torchvision torchaudio',
'说明': '安装CPU版本,适合学习和开发',
'验证代码': '''import torch
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA是否可用: {torch.cuda.is_available()}")
'''
},
'GPU支持 (CUDA 11.8)': {
'命令': 'pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118',
'说明': '安装CUDA 11.8支持的GPU版本',
'前提条件': [
'NVIDIA GPU (Compute Capability 3.5+)',
'CUDA Toolkit 11.8',
'cuDNN 8.6+'
]
},
'conda安装': {
'命令': 'conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia',
'说明': '使用conda安装(推荐)',
'优势': '自动处理依赖,环境管理方便'
},
'Docker安装': {
'命令': 'docker pull pytorch/pytorch:latest',
'说明': '使用Docker容器',
'运行命令': 'docker run -it --gpus all pytorch/pytorch:latest python'
},
'特定版本': {
'命令': 'pip install torch==2.1.0 torchvision==0.16.0 torchaudio==2.1.0',
'说明': '安装特定版本(生产环境推荐)'
},
'开发版本': {
'命令': 'pip install --pre torch torchvision torchaudio --index-url https://download.pytorch.org/whl/nightly/cu118',
'说明': '安装夜间构建版本(最新功能)'
}
}
print("PyTorch安装配置指南")
print("=" * 80)
for setup_type, setup_info in setups.items():
print(f"\n{setup_type}:")
print(f" 命令: {setup_info['命令']}")
print(f" 说明: {setup_info['说明']}")
if '前提条件' in setup_info:
print(" 前提条件:")
for req in setup_info['前提条件']:
print(f" • {req}")
if '优势' in setup_info:
print(f" 优势: {setup_info['优势']}")
if '验证代码' in setup_info:
print(f" 验证代码: {setup_info['验证代码']}")
print("\n" + "=" * 80)
print("\n验证安装(复制运行以下代码):")
print("""import torch
import torchvision
# 打印版本信息
print(f"PyTorch版本: {torch.__version__}")
print(f"torchvision版本: {torchvision.__version__}")
# 检查CUDA
if torch.cuda.is_available():
print(f"CUDA可用")
print(f"CUDA版本: {torch.version.cuda}")
print(f"GPU数量: {torch.cuda.device_count()}")
print(f"当前GPU: {torch.cuda.current_device()}")
print(f"GPU名称: {torch.cuda.get_device_name(0)}")
else:
print("CUDA不可用,使用CPU")
""")
setup_pytorch_environment()
📌 实操建议:
- 生产环境务必指定版本号(如
torch==2.1.0),避免因自动升级引发兼容性问题;
- 使用 Docker 时,推荐拉取
pytorch/pytorch:2.1.0-cuda11.8-cudnn8-runtime 镜像,而非 latest;
- 若需多版本共存,用
conda env create -f environment.yml 管理隔离环境。
2. GPU配置与优化
正确配置 GPU 是释放算力的前提。以下脚本涵盖设备检测、内存管理、混合精度与性能调优:
def configure_gpu_for_pytorch():
"""配置PyTorch GPU使用"""
config_code = '''import torch
import numpy as np
import time
def configure_gpu_settings():
"""配置GPU设置"""
print("GPU配置与优化")
print("=" * 60)
# 1. 检查可用GPU
device_count = torch.cuda.device_count()
print(f"可用GPU数量: {device_count}")
if device_count == 0:
print("警告: 未找到GPU,使用CPU运行")
device = torch.device('cpu')
else:
device = torch.device('cuda:0')
# 2. 显示GPU信息
for i in range(device_count):
gpu_name = torch.cuda.get_device_name(i)
gpu_memory = torch.cuda.get_device_properties(i).total_memory / 1e9
print(f"GPU {i}: {gpu_name}, 显存: {gpu_memory:.1f} GB")
# 3. 设置当前设备
torch.cuda.set_device(0)
print(f"当前GPU: {torch.cuda.current_device()}")
# 4. 清空GPU缓存
torch.cuda.empty_cache()
# 5. 设置cuDNN基准
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.enabled = True
# 6. 设置随机种子(确保可复现性)
torch.manual_seed(42)
torch.cuda.manual_seed(42)
torch.cuda.manual_seed_all(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# 7. 自动混合精度训练
try:
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
print("自动混合精度训练已启用")
except:
scaler = None
print("自动混合精度训练不可用")
return device, scaler
def check_gpu_performance(device):
"""检查GPU性能"""
print("\\nGPU性能测试:")
# 创建测试张量
size = 5000
a = torch.randn(size, size, device=device)
b = torch.randn(size, size, device=device)
# 预热
_ = torch.mm(a, b)
torch.cuda.synchronize()
# 测量矩阵乘法时间
times = []
for _ in range(10):
start = time.time()
c = torch.mm(a, b)
torch.cuda.synchronize() # 确保计算完成
times.append(time.time() - start)
avg_time = np.mean(times) * 1000 # 转换为毫秒
print(f"矩阵乘法 ({size}x{size}) 平均耗时: {avg_time:.2f} ms")
print(f"结果形状: {c.shape}")
# 内存使用情况
if device.type == 'cuda':
memory_allocated = torch.cuda.memory_allocated() / 1e6
memory_reserved = torch.cuda.memory_reserved() / 1e6
print(f"已分配显存: {memory_allocated:.1f} MB")
print(f"已保留显存: {memory_reserved:.1f} MB")
return avg_time
def memory_management_demo():
"""内存管理演示"""
print("\\n内存管理演示:")
# 1. 监控内存使用
if torch.cuda.is_available():
# 清空缓存
torch.cuda.empty_cache()
# 分配大张量
print("分配大张量...")
big_tensor = torch.randn(10000, 10000, device='cuda')
print(f"分配后显存: {torch.cuda.memory_allocated() / 1e6:.1f} MB")
# 删除张量
del big_tensor
torch.cuda.empty_cache()
print(f"删除后显存: {torch.cuda.memory_allocated() / 1e6:.1f} MB")
# 2. 使用with torch.no_grad()减少内存
print("\\n使用torch.no_grad()减少内存:")
with torch.no_grad():
x = torch.randn(1000, 1000, device='cuda')
y = torch.randn(1000, 1000, device='cuda')
z = x @ y # 不会保存计算图
print(f"no_grad模式下显存: {torch.cuda.memory_allocated() / 1e6:.1f} MB")
# 运行配置
device, scaler = configure_gpu_settings()
check_gpu_performance(device)
memory_management_demo()
'''
print(config_code)
print("\n常见GPU问题解决方案:")
print("1. CUDA版本不匹配: 使用对应CUDA版本的PyTorch")
print("2. 显存不足: 减小batch_size,使用梯度累积")
print("3. 多GPU训练: 使用torch.nn.DataParallel或DistributedDataParallel")
print("4. 性能优化: 启用cudnn.benchmark,使用混合精度训练")
print("5. 内存泄漏: 及时del张量,使用torch.cuda.empty_cache()")
configure_gpu_for_pytorch()
💡 高频技巧:
torch.cuda.empty_cache() 不会释放已分配但未使用的显存,仅清空缓存池;
torch.backends.cudnn.benchmark = True 在首次运行时自动寻找最优卷积算法,适合输入尺寸固定的场景;
- 混合精度(AMP)可将训练速度提升 1.5–3 倍,同时降低显存占用约 50%,强烈推荐开启。
三、PyTorch核心概念
1. 张量(Tensor)基础
张量是 PyTorch 的数据基石。理解其创建、属性与操作,是掌握整个框架的第一步。
import torch
import numpy as np
def tensor_basics():
"""PyTorch张量基础"""
print("=" * 60)
print("PyTorch张量基础")
print("=" * 60)
# 1. 创建张量
print("\n1. 创建张量:")
# 标量 (0维张量)
scalar = torch.tensor(42)
print(f"标量: {scalar}, 形状: {scalar.shape}, 数据类型: {scalar.dtype}")
# 向量 (1维张量)
vector = torch.tensor([1, 2, 3, 4, 5])
print(f"向量: {vector}, 形状: {vector.shape}")
# 矩阵 (2维张量)
matrix = torch.tensor([[1, 2], [3, 4], [5, 6]])
print(f"矩阵: {matrix}, 形状: {matrix.shape}")
# 3维张量
tensor_3d = torch.tensor([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
print(f"3维张量: 形状: {tensor_3d.shape}")
# 2. 特殊张量
print("\n2. 特殊张量:")
# 零张量
zeros = torch.zeros(2, 3)
print(f"零张量:\n{zeros}")
# 一张量
ones = torch.ones(3, 2)
print(f"一张量:\n{ones}")
# 单位矩阵
eye = torch.eye(3)
print(f"单位矩阵:\n{eye}")
# 随机张量
random_normal = torch.randn(2, 2) # 标准正态分布
print(f"正态分布随机张量:\n{random_normal}")
random_uniform = torch.rand(2, 2) # 均匀分布[0, 1)
print(f"均匀分布随机张量:\n{random_uniform}")
# 3. 张量属性
print("\n3. 张量属性:")
tensor = torch.tensor([[1, 2, 3], [4, 5, 6]], dtype=torch.float32)
print(f"张量:\n{tensor}")
print(f"形状: {tensor.shape}")
print(f"数据类型: {tensor.dtype}")
print(f"维度数: {tensor.dim()}")
print(f"元素总数: {tensor.numel()}")
print(f"设备: {tensor.device}")
print(f"是否requires_grad: {tensor.requires_grad}")
print(f"转换为NumPy:\n{tensor.numpy()}")
# 4. 张量操作
print("\n4. 张量操作:")
a = torch.tensor([[1, 2], [3, 4]], dtype=torch.float32)
b = torch.tensor([[5, 6], [7, 8]], dtype=torch.float32)
print(f"加法:\n{a + b}")
print(f"乘法:\n{a * b}")
print(f"矩阵乘法:\n{torch.matmul(a, b)}")
print(f"点积:\n{torch.dot(a.flatten(), b.flatten())}")
# 重塑
original = torch.tensor([1, 2, 3, 4, 5, 6])
reshaped = original.view(2, 3)
print(f"重塑前: {original.shape}, 重塑后: {reshaped.shape}")
# 转置
transposed = reshaped.t()
print(f"转置: {transposed.shape}")
# 拼接
concat = torch.cat([a, b], dim=0)
print(f"拼接 (dim=0):\n{concat}")
# 5. 广播
print("\n5. 广播机制:")
x = torch.tensor([1, 2, 3])
y = torch.tensor([[10], [20], [30]])
print(f"x: {x.shape}, y: {y.shape}")
print(f"x + y:\n{x + y}")
# 6. 张量与NumPy互操作
print("\n6. 张量与NumPy互操作:")
# PyTorch张量 -> NumPy数组
torch_tensor = torch.randn(3, 3)
numpy_array = torch_tensor.numpy()
print(f"PyTorch张量 -> NumPy数组:\n{numpy_array}")
# NumPy数组 -> PyTorch张量
numpy_array = np.random.randn(3, 3)
torch_tensor = torch.from_numpy(numpy_array)
print(f"NumPy数组 -> PyTorch张量:\n{torch_tensor}")
# 注意:共享内存!
print(f"共享内存: {torch_tensor.data_ptr() == torch.from_numpy(numpy_array).data_ptr()}")
return tensor
tensor_basics()
⚠️ 关键细节:
torch.tensor() 默认不共享内存;torch.from_numpy() 默认共享内存(修改一方会影响另一方);
view() 要求张量内存连续,否则报错;reshape() 更鲁棒,会自动拷贝;
.item() 仅适用于单元素张量,用于提取 Python 标量值。
2. 自动微分(Autograd)
PyTorch 的 autograd 是其“动态图”灵魂所在。它让反向传播完全隐式、无需手动推导公式。
def autograd_demo():
"""PyTorch自动微分演示"""
print("=" * 80)
print("PyTorch自动微分 (Autograd)")
print("=" * 80)
# 1. 基本自动微分
print("\n1. 基本自动微分:")
basic_code = '''# 创建需要梯度的张量
x = torch.tensor(2.0, requires_grad=True)
y = torch.tensor(3.0, requires_grad=True)
# 定义计算
z = x**2 + y**3 + x*y
print(f"x = {x.item()}, y = {y.item()}")
print(f"z = x^2 + y^3 + x*y = {z.item()}")
# 计算梯度
z.backward()
print(f"∂z/∂x = {x.grad.item()}") # 2x + y = 2*2 + 3 = 7
print(f"∂z/∂y = {y.grad.item()}") # 3y^2 + x = 3*9 + 2 = 29
# 验证
print(f"验证 ∂z/∂x: 2*{x.item()} + {y.item()} = {2*x.item() + y.item()}")
print(f"验证 ∂z/∂y: 3*{y.item()}^2 + {x.item()} = {3*y.item()**2 + x.item()}")
'''
print(basic_code)
# 2. 计算图可视化
print("\n2. 计算图可视化:")
graph_code = '''# 创建更复杂的计算图
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
w = torch.tensor([0.5, 0.3, 0.2], requires_grad=True)
b = torch.tensor(1.0, requires_grad=True)
# 线性变换
y = torch.sum(x * w) + b
# 激活函数
z = torch.sigmoid(y)
print(f"输入 x: {x}")
print(f"权重 w: {w}")
print(f"偏置 b: {b}")
print(f"线性输出 y = Σ(x*w) + b = {y.item()}")
print(f"Sigmoid输出 z = σ(y) = {z.item()}")
# 计算梯度
z.backward()
print(f"\\n梯度:")
print(f"∂z/∂x = {x.grad}")
print(f"∂z/∂w = {w.grad}")
print(f"∂z/∂b = {b.grad}")
# 手动验证梯度
sigmoid_y = torch.sigmoid(y)
dz_dy = sigmoid_y * (1 - sigmoid_y) # σ'(y) = σ(y)(1-σ(y))
print(f"\\n手动验证:")
print(f"∂z/∂y = σ(y)(1-σ(y)) = {dz_dy.item()}")
print(f"∂z/∂x = ∂z/∂y * ∂y/∂x = {dz_dy.item()} * w = {dz_dy.item() * w}")
'''
print(graph_code)
# 3. 梯度控制
print("\n3. 梯度控制:")
control_code = '''# 1. 禁用梯度计算
print("禁用梯度计算:")
with torch.no_grad():
x = torch.tensor([1.0, 2.0, 3.0])
y = x * 2
print(f"x = {x}")
print(f"y = x * 2 = {y}")
print(f"y.requires_grad = {y.requires_grad}")
# 2. 分离张量(从计算图中分离)
print("\\n分离张量:")
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x * 2
z = y.detach() * 3 # y.detach() 返回不需要梯度的新张量
print(f"x.requires_grad = {x.requires_grad}")
print(f"y.requires_grad = {y.requires_grad}")
print(f"z.requires_grad = {z.requires_grad}")
# 3. 梯度累积
print("\\n梯度累积:")
x = torch.tensor(2.0, requires_grad=True)
# 多次前向传播,累积梯度
for i in range(3):
y = x ** 2
y.backward(retain_graph=True) # 保留计算图
print(f"第 {i+1} 次反向传播,梯度: {x.grad.item()}")
# 清空梯度
x.grad.zero_()
print(f"清空梯度后: {x.grad.item()}")
# 4. 自定义梯度函数
print("\\n自定义梯度函数:")
class MyReLU(torch.autograd.Function):
"""自定义ReLU函数,带自定义反向传播"""
@staticmethod
def forward(ctx, input):
ctx.save_for_backward(input) # 保存输入用于反向传播
return input.clamp(min=0)
@staticmethod
def backward(ctx, grad_output):
input, = ctx.saved_tensors
grad_input = grad_output.clone()
grad_input[input < 0] = 0 # ReLU的梯度
return grad_input
# 使用自定义函数
x = torch.tensor([-1.0, 0.5, 2.0, -0.3], requires_grad=True)
y = MyReLU.apply(x)
print(f"输入: {x}")
print(f"MyReLU输出: {y}")
y.sum().backward()
print(f"梯度: {x.grad}")
'''
print(control_code)
# 4. 高阶导数
print("\n4. 高阶导数:")
high_order_code = '''# 计算二阶导数
x = torch.tensor(3.0, requires_grad=True)
y = x**3 + 2*x**2 + x + 1
# 一阶导数
first_derivative = torch.autograd.grad(y, x, create_graph=True)[0]
print(f"y = x^3 + 2x^2 + x + 1")
print(f"x = {x.item()}")
print(f"y = {y.item()}")
print(f"dy/dx = {first_derivative.item()}")
# 二阶导数
second_derivative = torch.autograd.grad(first_derivative, x)[0]
print(f"d²y/dx² = {second_derivative.item()}")
# 验证
print(f"\\n验证:")
print(f"dy/dx = 3x^2 + 4x + 1 = {3*x.item()**2 + 4*x.item() + 1}")
print(f"d²y/dx² = 6x + 4 = {6*x.item() + 4}")
'''
print(high_order_code)
# 5. 实战示例:线性回归梯度
print("\n5. 实战示例:线性回归梯度:")
regression_code = '''# 线性回归的梯度计算
torch.manual_seed(42)
# 生成数据
n_samples = 100
X = torch.randn(n_samples, 1) # 特征
true_w = 2.5
true_b = 1.0
y = true_w * X + true_b + torch.randn(n_samples, 1) * 0.1 # 添加噪声
# 初始化参数
w = torch.tensor(0.0, requires_grad=True)
b = torch.tensor(0.0, requires_grad=True)
# 学习率
learning_rate = 0.01
# 训练循环(手动梯度下降)
print("梯度下降训练线性回归:")
for epoch in range(100):
# 前向传播
y_pred = w * X + b
# 计算损失(均方误差)
loss = torch.mean((y_pred - y) ** 2)
# 反向传播
loss.backward()
# 更新参数(手动)
with torch.no_grad():
w -= learning_rate * w.grad
b -= learning_rate * b.grad
# 清空梯度
w.grad.zero_()
b.grad.zero_()
if epoch % 20 == 0:
print(f"Epoch {epoch:3d}: w = {w.item():.4f}, b = {b.item():.4f}, Loss = {loss.item():.4f}")
print(f"\\n真实参数: w = {true_w}, b = {true_b}")
print(f"学习参数: w = {w.item():.4f}, b = {b.item():.4f}")
'''
print(regression_code)
return None
autograd_demo()
🔍 原理精要:
requires_grad=True 是开启梯度追踪的开关,只有叶子节点(用户创建的张量)才需显式设置;
retain_graph=True 防止反向传播后释放计算图,允许多次调用 .backward();
torch.no_grad() 是推理阶段的黄金守则,可节省 30%+ 显存并提速。
四、PyTorch神经网络模块
1. torch.nn.Module基础
nn.Module 是所有神经网络的基类。掌握其生命周期(__init__ → forward)、参数注册与容器使用,是构建任意复杂模型的基础。
def nn_module_basics():
"""PyTorch神经网络模块基础"""
print("=" * 80)
print("PyTorch torch.nn 模块")
print("=" * 80)
# 1. 基础Module类
print("\n1. 基础Module类:")
basic_module_code = '''import torch
import torch.nn as nn
import torch.nn.functional as F
class SimpleNet(nn.Module):
"""简单的全连接神经网络"""
def __init__(self, input_size=784, hidden_size=128, output_size=10):
super().__init__() # 必须调用父类初始化
# 定义网络层
self.fc1 = nn.Linear(input_size, hidden_size) # 全连接层1
self.fc2 = nn.Linear(hidden_size, hidden_size // 2) # 全连接层2
self.fc3 = nn.Linear(hidden_size // 2, output_size) # 输出层
# Dropout层
self.dropout = nn.Dropout(p=0.2)
# BatchNorm层
self.batchnorm = nn.BatchNorm1d(hidden_size // 2)
def forward(self, x):
"""前向传播"""
# 展平输入(如果是图像)
x = x.view(x.size(0), -1)
# 第一层 + ReLU激活 + Dropout
x = F.relu(self.fc1(x))
x = self.dropout(x)
# 第二层 + ReLU激活 + BatchNorm
x = F.relu(self.fc2(x))
x = self.batchnorm(x)
x = self.dropout(x)
# 输出层(无激活函数,用于分类)
x = self.fc3(x)
return x
# 创建模型实例
model = SimpleNet(input_size=784, hidden_size=256, output_size=10)
print("模型结构:")
print(model)
# 前向传播
x = torch.randn(32, 784) # 批量大小32,特征784
output = model(x)
print(f"\\n输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
# 模型参数
print(f"\\n模型参数数量: {sum(p.numel() for p in model.parameters())}")
print(f"可训练参数数量: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")
# 访问特定层
print(f"\\n第一层权重形状: {model.fc1.weight.shape}")
print(f"第一层偏置形状: {model.fc1.bias.shape}")
'''
print(basic_module_code)
# 2. 常用层类型
print("\n2. 常用层类型:")
layers_code = '''# 常用层类型示例
layers_examples = {
'线性层': nn.Linear(in_features=10, out_features=5),
'卷积层': nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3),
'池化层': nn.MaxPool2d(kernel_size=2, stride=2),
'循环层': nn.LSTM(input_size=10, hidden_size=20, num_layers=2),
'批归一化': nn.BatchNorm2d(num_features=16),
'Dropout': nn.Dropout(p=0.5),
'嵌入层': nn.Embedding(num_embeddings=1000, embedding_dim=50),
'转置卷积': nn.ConvTranspose2d(in_channels=16, out_channels=3, kernel_size=3),
'层归一化': nn.LayerNorm(normalized_shape=128),
'实例归一化': nn.InstanceNorm2d(num_features=16),
'组归一化': nn.GroupNorm(num_groups=4, num_channels=16),
'自适应池化': nn.AdaptiveAvgPool2d(output_size=(7, 7))
}
print("常用神经网络层:")
for name, layer in layers_examples.items():
print(f" • {name}: {layer}")
if hasattr(layer, 'weight'):
print(f" 权重形状: {layer.weight.shape if hasattr(layer.weight, 'shape') else 'N/A'}")
# 激活函数
activations = {
'ReLU': nn.ReLU(),
'LeakyReLU': nn.LeakyReLU(negative_slope=0.01),
'Sigmoid': nn.Sigmoid(),
'Tanh': nn.Tanh(),
'Softmax': nn.Softmax(dim=1),
'LogSoftmax': nn.LogSoftmax(dim=1),
'GELU': nn.GELU(),
'SiLU': nn.SiLU(), # 也称为Swish
'Mish': nn.Mish(),
'ELU': nn.ELU(alpha=1.0)
}
print("\\n常用激活函数:")
for name, activation in activations.items():
print(f" • {name}: {activation}")
'''
print(layers_code)
# 3. 损失函数
print("\n3. 损失函数:")
loss_functions_code = '''# 常用损失函数
loss_functions = {
'均方误差': nn.MSELoss(),
'平均绝对误差': nn.L1Loss(),
'交叉熵损失': nn.CrossEntropyLoss(),
'二元交叉熵': nn.BCELoss(),
'带logits的二元交叉熵': nn.BCEWithLogitsLoss(),
'负对数似然': nn.NLLLoss(),
'KL散度': nn.KLDivLoss(),
'Huber损失': nn.SmoothL1Loss(),
'余弦相似度': nn.CosineEmbeddingLoss(),
'MarginRanking损失': nn.MarginRankingLoss(),
'多标签Margin损失': nn.MultiLabelMarginLoss(),
'Triplet损失': nn.TripletMarginLoss(),
'CTCLoss': nn.CTCLoss() # 连接主义时序分类
}
print("常用损失函数:")
for name, loss_fn in loss_functions.items():
print(f" • {name}: {loss_fn}")
# 损失函数使用示例
print("\\n损失函数使用示例:")
# 分类问题
criterion_ce = nn.CrossEntropyLoss()
outputs = torch.randn(4, 3) # 4个样本,3个类别
targets = torch.tensor([0, 2, 1, 0]) # 真实标签
loss = criterion_ce(outputs, targets)
print(f"交叉熵损失: {loss.item():.4f}")
# 回归问题
criterion_mse = nn.MSELoss()
predictions = torch.tensor([1.2, 2.3, 3.4])
targets_reg = torch.tensor([1.0, 2.0, 3.0])
loss_mse = criterion_mse(predictions, targets_reg)
print(f"均方误差损失: {loss_mse.item():.4f}")
'''
print(loss_functions_code)
# 4. 优化器
print("\n4. 优化器:")
optimizers_code = '''import torch.optim as optim
# 常用优化器
optimizers = {
'SGD': optim.SGD,
'Momentum': lambda params: optim.SGD(params, momentum=0.9),
'Adam': optim.Adam,
'AdamW': optim.AdamW,
'RMSprop': optim.RMSprop,
'Adagrad': optim.Adagrad,
'Adadelta': optim.Adadelta,
'Adamax': optim.Adamax,
'NAdam': optim.NAdam,
'RAdam': optim.RAdam
}
print("常用优化器:")
for name, optimizer_class in optimizers.items():
print(f" • {name}")
# 优化器使用示例
# 创建简单模型
model = nn.Sequential(
nn.Linear(10, 20),
nn.ReLU(),
nn.Linear(20, 1)
)
# 创建优化器
optimizer_adam = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999))
optimizer_sgd = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
print(f"\\nAdam优化器参数:")
for param_group in optimizer_adam.param_groups:
print(f" 学习率: {param_group['lr']}")
print(f" betas: {param_group['betas']}")
print(f"\\nSGD优化器参数:")
for param_group in optimizer_sgd.param_groups:
print(f" 学习率: {param_group['lr']}")
print(f" 动量: {param_group['momentum']}")
# 学习率调度器
schedulers = {
'StepLR': optim.lr_scheduler.StepLR(optimizer_adam, step_size=30, gamma=0.1),
'MultiStepLR': optim.lr.scheduler.MultiStepLR(optimizer_adam, milestones=[30, 80], gamma=0.1),
'ExponentialLR': optim.lr_scheduler.ExponentialLR(optimizer_adam, gamma=0.95),
'CosineAnnealingLR': optim.lr_scheduler.CosineAnnealingLR(optimizer_adam, T_max=100),
'ReduceLROnPlateau': optim.lr_scheduler.ReduceLROnPlateau(optimizer_adam, mode='min', patience=5),
'CyclicLR': optim.lr_scheduler.CyclicLR(optimizer_adam, base_lr=0.001, max_lr=0.01, step_size_up=20)
}
print("\\n学习率调度器:")
for name, scheduler in schedulers.items():
print(f" • {name}: {scheduler}")
'''
print(optimizers_code)
# 5. 容器模块
print("\n5. 容器模块:")
containers_code = '''# 容器模块:组合多个模块
print("容器模块示例:")
# 1. Sequential - 顺序容器
sequential_model = nn.Sequential(
nn.Conv2d(3, 16, kernel_size=3, padding=1),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(16, 32, kernel_size=3, padding=1),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Flatten(),
nn.Linear(32 * 8 * 8, 128),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(128, 10)
)
print("Sequential模型:")
print(sequential_model)
# 测试Sequential模型
x = torch.randn(4, 3, 32, 32) # 4个样本,3通道,32x32
output = sequential_model(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
# 2. ModuleList - 模块列表
class ModuleListNet(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(10, 20),
nn.Linear(20, 30),
nn.Linear(30, 40)
])
self.activations = nn.ModuleList([
nn.ReLU(),
nn.ReLU(),
nn.Sigmoid()
])
def forward(self, x):
for layer, activation in zip(self.layers, self.activations):
x = activation(layer(x))
return x
modulelist_model = ModuleListNet()
print(f"\\nModuleList模型参数数量: {sum(p.numel() for p in modulelist_model.parameters())}")
# 3. ModuleDict - 模块字典
class ModuleDictNet(nn.Module):
def __init__(self):
super().__init__()
self.operations = nn.ModuleDict({
'conv1': nn.Conv2d(3, 16, 3),
'conv2': nn.Conv2d(16, 32, 3),
'pool': nn.MaxPool2d(2),
'fc': nn.Linear(32 * 14 * 14, 10)
})
def forward(self, x):
x = self.operations['conv1'](x)
x = self.operations['pool'](x)
x = self.operations['conv2'](x)
x = self.operations['pool'](x)
x = x.view(x.size(0), -1)
x = self.operations['fc'](x)
return x
moduledict_model = ModuleDictNet()
print(f"ModuleDict模型参数数量: {sum(p.numel() for p in moduledict_model.parameters())}")
# 4. 参数容器
class ParameterNet(nn.Module):
def __init__(self):
super().__init__()
# 参数张量(自动注册为参数)
self.weight = nn.Parameter(torch.randn(10, 20))
self.bias = nn.Parameter(torch.zeros(20))
# ParameterList
self.weights_list = nn.ParameterList([
nn.Parameter(torch.randn(10, 10)),
nn.Parameter(torch.randn(10, 10))
])
# ParameterDict
self.weights_dict = nn.ParameterDict({
'weight1': nn.Parameter(torch.randn(10, 10)),
'weight2': nn.Parameter(torch.randn(10, 10))
})
def forward(self, x):
return x @ self.weight + self.bias
param_model = ParameterNet()
print(f"\\nParameter模型参数数量: {sum(p.numel() for p in param_model.parameters())}")
'''
print(containers_code)
return None
nn_module_basics()
🧠 设计哲学:
nn.Sequential 适合线性流程,但无法表达分支、循环等复杂拓扑;
nn.ModuleList / nn.ModuleDict 是动态网络(如 NAS、动态路由)的基石;
nn.Parameter 显式声明可学习参数,是实现自定义层的核心。
2. 数据加载与处理
高效的数据管道是训练稳定性的保障。torch.utils.data 提供了灵活、可扩展的抽象。
def data_loading_pipeline():
"""PyTorch数据加载与处理"""
print("=" * 80)
print("PyTorch数据加载与处理")
print("=" * 80)
# 1. 基础Dataset和DataLoader
print("\n1. 基础Dataset和DataLoader:")
basic_dataloader_code = '''import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
import numpy as np
# 1.1 使用TensorDataset(简单情况)
print("TensorDataset示例:")
# 创建模拟数据
x = torch.randn(1000, 10) # 1000个样本,10个特征
y = torch.randint(0, 2, (1000, 1)).float() # 二分类标签
# 创建TensorDataset
dataset = TensorDataset(x, y)
print(f"数据集大小: {len(dataset)}")
print(f"样本形状: {dataset[0][0].shape}, 标签形状: {dataset[0][1].shape}")
# 创建DataLoader
dataloader = DataLoader(
dataset,
batch_size=32,
shuffle=True,
num_workers=2, # 并行加载数据的工作进程数
pin_memory=True # 如果使用GPU,可以加速数据传输
)
print(f"DataLoader批次数量: {len(dataloader)}")
# 遍历DataLoader
for batch_idx, (batch_x, batch_y) in enumerate(dataloader):
if batch_idx == 0:
print(f"第一个批次: 输入形状: {batch_x.shape}, 标签形状: {batch_y.shape}")
break
# 1.2 自定义Dataset
print("\\n自定义Dataset示例:")
class CustomDataset(Dataset):
"""自定义数据集类"""
def __init__(self, data_path, transform=None):
"""
初始化数据集
Args:
data_path: 数据路径
transform: 数据转换函数
"""
# 这里应该加载数据
# 为了示例,我们生成随机数据
self.data = torch.randn(1000, 3, 32, 32) # 1000张32x32 RGB图像
self.labels = torch.randint(0, 10, (1000,)) # 10个类别
self.transform = transform
def __len__(self):
"""返回数据集大小"""
return len(self.data)
def __getitem__(self, idx):
"""获取单个样本"""
image = self.data[idx]
label = self.labels[idx]
# 应用变换(如果有)
if self.transform:
image = self.transform(image)
return image, label
# 创建自定义数据集
custom_dataset = CustomDataset("fake_path")
print(f"自定义数据集大小: {len(custom_dataset)}")
# 2. 数据变换(Transforms)
print("\\n2. 数据变换(Transforms):")
import torchvision.transforms as transforms
# 图像数据变换
image_transforms = transforms.Compose([
transforms.Resize((224, 224)), # 调整大小
transforms.RandomHorizontalFlip(p=0.5), # 随机水平翻转
transforms.RandomRotation(degrees=15), # 随机旋转
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # 颜色抖动
transforms.ToTensor(), # 转换为张量
transforms.Normalize(mean=[0.485, 0.456, 0.406], # 归一化(ImageNet统计)
std=[0.229, 0.224, 0.225])
])
print("图像变换管道:")
for transform in image_transforms.transforms:
print(f" • {transform}")
# 文本数据变换示例
print("\\n文本数据变换示例:")
text_transforms = transforms.Compose([
# 在实际中,这里可能是tokenizer
lambda x: x.lower(), # 转换为小写
lambda x: x.strip(), # 去除空白
# 添加更多文本处理步骤...
])
# 3. torchvision数据集
print("\\n3. torchvision内置数据集:")
# 注意:实际使用时需要下载数据
try:
import torchvision
from torchvision import datasets
# MNIST数据集示例
mnist_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
# 训练集
mnist_train = datasets.MNIST(
root='./data',
train=True,
download=True, # 如果数据不存在则下载
transform=mnist_transform
)
# 测试集
mnist_test = datasets.MNIST(
root='./data',
train=False,
download=True,
transform=mnist_transform
)
print(f"MNIST训练集大小: {len(mnist_train)}")
print(f"MNIST测试集大小: {len(mnist_test)}")
# 其他常用数据集
datasets_list = [
('CIFAR-10', datasets.CIFAR10),
('CIFAR-100', datasets.CIFAR100),
('ImageNet', datasets.ImageNet),
('FashionMNIST', datasets.FashionMNIST),
('COCO', datasets.CocoDetection),
('VOC', datasets.VOCDetection)
]
print("\\n其他torchvision数据集:")
for name, dataset_class in datasets_list:
print(f" • {name}")
except ImportError:
print("torchvision不可用,请安装: pip install torchvision")
'''
print(basic_dataloader_code)
# 2. 高级数据加载技巧
print("\n2. 高级数据加载技巧:")
advanced_dataloader_code = '''# 1. 自定义collate_fn
print("自定义collate_fn示例:")
def custom_collate_fn(batch):
"""自定义批次处理函数"""
# batch是列表,元素为(__getitem__返回的样本, 标签)
images = []
labels = []
additional_info = []
for item in batch:
# 假设每个样本返回 (image, label, info)
image, label, info = item
images.append(image)
labels.append(label)
additional_info.append(info)
# 堆叠张量
images = torch.stack(images, dim=0)
labels = torch.tensor(labels)
return images, labels, additional_info
# 2. 采样器(Sampler)
from torch.utils.data import Sampler, RandomSampler, SequentialSampler, WeightedRandomSampler
print("\\n采样器示例:")
# 随机采样器
random_sampler = RandomSampler(mnist_train)
print(f"随机采样器: {random_sampler}")
# 顺序采样器
sequential_sampler = SequentialSampler(mnist_train)
print(f"顺序采样器: {sequential_sampler}")
# 加权随机采样器(用于处理类别不平衡)
weights = [1.0] * len(mnist_train) # 这里应该是每个样本的权重
weighted_sampler = WeightedRandomSampler(weights, num_samples=1000, replacement=True)
print(f"加权随机采样器: {weighted_sampler}")
# 3. 批采样器(BatchSampler)
from torch.utils.data import BatchSampler
batch_sampler = BatchSampler(random_sampler, batch_size=32, drop_last=False)
print(f"批采样器: {batch_sampler}")
# 4. 分布式采样器
from torch.utils.data.distributed import DistributedSampler
# 分布式采样器(用于多GPU训练)
# distributed_sampler = DistributedSampler(mnist_train)
# print(f"分布式采样器: {distributed_sampler}")
# 5. DataLoader高级参数
advanced_dataloader = DataLoader(
mnist_train,
batch_size=64,
shuffle=True,
num_workers=4, # 并行工作进程数
pin_memory=True, # 加速GPU数据传输
prefetch_factor=2, # 预取因子
persistent_workers=True, # 保持工作进程活跃
drop_last=True # 丢弃最后一个不完整的批次
)
print(f"\\nDataLoader高级参数:")
print(f" batch_size: {advanced_dataloader.batch_size}")
print(f" num_workers: {advanced_dataloader.num_workers}")
print(f" pin_memory: {advanced_dataloader.pin_memory}")
print(f" drop_last: {advanced_dataloader.drop_last}")
# 6. 迭代DataLoader的多种方式
print("\\n迭代DataLoader的多种方式:")
# 方式1: 直接迭代
print("方式1: 直接迭代")
for batch in advanced_dataloader:
images, labels = batch
print(f"批次形状: {images.shape}")
break
# 方式2: 使用enumerate获取批次索引
print("\\n方式2: 使用enumerate")
for batch_idx, (images, labels) in enumerate(advanced_dataloader):
if batch_idx == 0:
print(f"批次 {batch_idx}: {images.shape}")
break
# 方式3: 使用tqdm显示进度条
print("\\n方式3: 使用tqdm进度条")
try:
from tqdm import tqdm
# 创建进度条
pbar = tqdm(advanced_dataloader, desc="训练", total=len(advanced_dataloader))
for images, labels in pbar:
# 在这里进行训练
# pbar.set_description(f"训练 (Loss: {loss:.4f})")
break
print("使用tqdm显示进度条")
except ImportError:
print("tqdm未安装,使用: pip install tqdm")
'''
print(advanced_dataloader_code)
# 3. 实战示例:完整数据管道
print("\n3. 实战示例:完整数据管道:")
practical_pipeline_code = '''import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from PIL import Image
import os
class ImageClassificationDataset(Dataset):
"""图像分类数据集"""
def __init__(self, root_dir, transform=None, split='train'):
"""
初始化数据集
Args:
root_dir: 数据根目录
transform: 数据变换
split: 'train' 或 'val'
"""
self.root_dir = root_dir
self.transform = transform
self.split = split
# 组织数据
self.image_paths = []
self.labels = []
self.class_to_idx = {}
# 假设目录结构为:
# root_dir/
# train/
# class1/
# img1.jpg
# img2.jpg
# class2/
# img1.jpg
# val/
# class1/
# class2/
split_dir = os.path.join(root_dir, split)
if os.path.exists(split_dir):
# 获取类别
classes = sorted(os.listdir(split_dir))
self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(classes)}
# 遍历所有类别
for class_name in classes:
class_dir = os.path.join(split_dir, class_name)
class_idx = self.class_to_idx[class_name]
# 遍历类别的所有图像
for img_name in os.listdir(class_dir):
if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
img_path = os.path.join(class_dir, img_name)
self.image_paths.append(img_path)
self.labels.append(class_idx)
else:
print(f"警告: 目录 {split_dir} 不存在")
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
img_path = self.image_paths[idx]
label = self.labels[idx]
# 加载图像
try:
image = Image.open(img_path).convert('RGB')
except Exception as e:
print(f"无法加载图像 {img_path}: {e}")
# 返回占位符
image = Image.new('RGB', (224, 224), color='white')
# 应用变换
if self.transform:
image = self.transform(image)
return image, label
def get_class_distribution(self):
"""获取类别分布"""
from collections import Counter
return Counter(self.labels)
# 数据增强变换
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(20),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# 验证集变换(不需要数据增强)
val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def create_data_loaders(data_dir, batch_size=32, num_workers=4):
"""创建数据加载器"""
# 创建数据集
train_dataset = ImageClassificationDataset(
root_dir=data_dir,
transform=train_transform,
split='train'
)
val_dataset = ImageClassificationDataset(
root_dir=data_dir,
transform=val_transform,
split='val'
)
print(f"训练集大小: {len(train_dataset)}")
print(f"验证集大小: {len(val_dataset)}")
print(f"类别数量: {len(train_dataset.class_to_idx)}")
# 创建数据加载器
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
drop_last=True
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=True
)
return train_loader, val_loader, train_dataset.class_to_idx
# 使用示例
print("完整数据管道示例:")
print("=" * 50)
# 注意:这里使用虚拟目录,实际使用时需要真实数据
# train_loader, val_loader, class_to_idx = create_data_loaders('./data')
print("数据管道创建完成!")
print("包含以下功能:")
print("1. 从目录结构自动加载图像和标签")
print("2. 训练集和验证集不同的数据增强")
print("3. 支持多进程数据加载")
print("4. 自动类别映射")
print("5. 错误处理(损坏图像)")
# 4. 数据缓存和性能优化
print("\\n4. 数据缓存和性能优化:")
class CachedDataset(Dataset):
"""带缓存的数据集"""
def __init__(self, dataset, cache_size=1000):
self.dataset = dataset
self.cache = {}
self.cache_size = cache_size
def __len__(self):
return len(self.dataset)
def __getitem__(self, idx):
if idx in self.cache:
return self.cache[idx]
item = self.dataset[idx]
# 如果缓存未满,添加到缓存
if len(self.cache) < self.cache_size:
self.cache[idx] = item
return item
print("带缓存的数据集可以加速数据加载,特别是当数据预处理很耗时的时候")
'''
print(practical_pipeline_code)
return None
data_loading_pipeline()
⚡ 性能提示:
num_workers > 0 可显著提升吞吐,但过多会导致内存暴涨,建议设为 min(4, cpu_count);
pin_memory=True + non_blocking=True(在 .to(device, non_blocking=True) 中)是 GPU 加速的关键组合;
prefetch_factor=2 表示每个 worker 预取 2 个 batch,有效掩盖 I/O 延迟。
五、PyTorch训练与验证
1. 基础训练循环
一个健壮的训练循环需兼顾正确性、可观测性与容错性。以下提供从零开始的完整实现:
def basic_training_loop():
"""PyTorch基础训练循环"""
print("=" * 80)
print("PyTorch基础训练循环")
print("=" * 80)
# 1. 完整训练示例
print("\n1. 完整训练示例:")
training_code = '''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split
# 设置随机种子以确保可复现性
torch.manual_seed(42)
np.random.seed(42)
# 1. 准备数据
def prepare_data():
"""准备模拟数据"""
n_samples = 1000
n_features = 20
n_classes = 3
# 生成数据
X = np.random.randn(n_samples, n_features).astype(np.float32)
y = np.random.randint(0, n_classes, n_samples).astype(np.int64)
# 划分训练集和验证集
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 转换为PyTorch张量
X_train_tensor = torch.from_numpy(X_train)
y_train_tensor = torch.from_numpy(y_train)
X_val_tensor = torch.from_numpy(X_val)
y_val_tensor = torch.from_numpy(y_val)
# 创建数据集和数据加载器
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
return train_loader, val_loader, n_features, n_classes
# 2. 定义模型
class SimpleClassifier(nn.Module):
"""简单的分类器"""
def __init__(self, input_size, hidden_size, num_classes):
super().__init__()
self.model = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.BatchNorm1d(hidden_size),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_size, hidden_size // 2),
nn.BatchNorm1d(hidden_size // 2),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_size // 2, num_classes)
)
def forward(self, x):
return self.model(x)
# 3. 训练函数
def train_epoch(model, dataloader, criterion, optimizer, device):
"""训练一个epoch"""
model.train() # 设置为训练模式
running_loss = 0.0
correct = 0
total = 0
for batch_idx, (inputs, targets) in enumerate(dataloader):
# 移动到设备
inputs, targets = inputs.to(device), targets.to(device)
# 清零梯度
optimizer.zero_grad()
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, targets)
# 反向传播
loss.backward()
# 梯度裁剪(防止梯度爆炸)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 更新参数
optimizer.step()
# 统计
running_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
# 打印进度
if batch_idx % 10 == 0:
print(f' Batch {batch_idx}/{len(dataloader)}: Loss={loss.item():.4f}')
epoch_loss = running_loss / len(dataloader)
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
# 4. 验证函数
def validate_epoch(model, dataloader, criterion, device):
"""验证一个epoch"""
model.eval() # 设置为评估模式
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad(): # 禁用梯度计算
for inputs, targets in dataloader:
# 移动到设备
inputs, targets = inputs.to(device), targets.to(device)
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, targets)
# 统计
running_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
epoch_loss = running_loss / len(dataloader)
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
# 5. 主训练循环
def main():
"""主训练函数"""
print("开始训练...")
print("=" * 50)
# 配置
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 准备数据
train_loader, val_loader, n_features, n_classes = prepare_data()
# 创建模型
model = SimpleClassifier(
input_size=n_features,
hidden_size=128,
num_classes=n_classes
).to(device)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
# 学习率调度器
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)
# 训练参数
num_epochs = 20
best_val_acc = 0.0
# 记录训练历史
history = {
'train_loss': [],
'train_acc': [],
'val_loss': [],
'val_acc': []
}
# 训练循环
for epoch in range(num_epochs):
print(f"\\nEpoch {epoch+1}/{num_epochs}")
print("-" * 30)
# 训练
train_loss, train_acc = train_epoch(
model, train_loader, criterion, optimizer, device
)
# 验证
val_loss, val_acc = validate_epoch(
model, val_loader, criterion, device
)
# 更新学习率
scheduler.step()
# 记录历史
history['train_loss'].append(train_loss)
history['train_acc'].append(train_acc)
history['val_loss'].append(val_loss)
history['val_acc'].append(val_acc)
# 打印结果
print(f"训练结果: 损失={train_loss:.4f}, 准确率={train_acc:.2f}%")
print(f"验证结果: 损失={val_loss:.4f}, 准确率={val_acc:.2f}%")
print(f"学习率: {optimizer.param_groups[0]['lr']:.6f}")
# 保存最佳模型
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'val_acc': val_acc,
}, 'best_model.pth')
print(f"保存最佳模型,验证准确率: {val_acc:.2f}%")
print("\\n训练完成!")
print(f"最佳验证准确率: {best_val_acc:.2f}%")
return model, history
# 运行训练
model, history = main()
'''
print(training_code)
# 2. 高级训练技巧
print("\n2. 高级训练技巧:")
advanced_training_code = '''import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler
class AdvancedTrainer:
"""高级训练器"""
def __init__(self, model, device, use_amp=True):
self.model = model
self.device = device
self.use_amp = use_amp
# 自动混合精度训练
if use_amp:
self.scaler = GradScaler()
else:
self.scaler = None
# 指标追踪
self.metrics = {
'train': {'loss': [], 'acc': []},
'val': {'loss': [], 'acc': []}
}
def train_epoch(self, train_loader, criterion, optimizer):
"""训练一个epoch(支持混合精度)"""
self.model.train()
total_loss = 0.0
correct = 0
total = 0
for batch_idx, (inputs, targets) in enumerate(train_loader):
inputs, targets = inputs.to(self.device), targets.to(self.device)
# 清零梯度
optimizer.zero_grad()
# 前向传播(支持混合精度)
if self.use_amp:
with autocast():
outputs = self.model(inputs)
loss = criterion(outputs, targets)
# 反向传播(缩放梯度)
self.scaler.scale(loss).backward()
# 取消缩放梯度并更新参数
self.scaler.step(optimizer)
self.scaler.update()
else:
outputs = self.model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
# 统计
total_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
epoch_loss = total_loss / len(train_loader)
epoch_acc = 100. * correct / total
self.metrics['train']['loss'].append(epoch_loss)
self.metrics['train']['acc'].append(epoch_acc)
return epoch_loss, epoch_acc
def validate_epoch(self, val_loader, criterion):
"""验证一个epoch"""
self.model.eval()
total_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in val_loader:
inputs, targets = inputs.to(self.device), targets.to(self.device)
outputs = self.model(inputs)
loss = criterion(outputs, targets)
total_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
epoch_loss = total_loss / len(val_loader)
epoch_acc = 100. * correct / total
self.metrics['val']['loss'].append(epoch_loss)
self.metrics['val']['acc'].append(epoch_acc)
return epoch_loss, epoch_acc
def train(self, train_loader, val_loader, criterion, optimizer,
scheduler=None, num_epochs=10, early_stopping_patience=5):
"""完整训练过程"""
print(f"开始训练,共 {num_epochs} 个epoch")
print("使用混合精度训练:", self.use_amp)
best_val_acc = 0.0
patience_counter = 0
for epoch in range(num_epochs):
print(f"\\nEpoch {epoch+1}/{num_epochs}")
print("-" * 30)
# 训练
train_loss, train_acc = self.train_epoch(train_loader, criterion, optimizer)
# 验证
val_loss, val_acc = self.validate_epoch(val_loader, criterion)
# 更新学习率
if scheduler:
scheduler.step(val_loss)
# 打印结果
print(f"训练: 损失={train_loss:.4f}, 准确率={train_acc:.2f}%")
print(f"验证: 损失={val_loss:.4f}, 准确率={val_acc:.2f}%")
# 早停检查
if val_acc > best_val_acc:
best_val_acc = val_acc
patience_counter = 0
# 保存最佳模型
torch.save({
'epoch': epoch,
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'val_acc': val_acc,
'metrics': self.metrics
}, 'best_model_advanced.pth')
print(f"保存最佳模型,验证准确率: {val_acc:.2f}%")
else:
patience_counter += 1
if patience_counter >= early_stopping_patience:
print(f"早停触发,在epoch {epoch+1}停止训练")
break
print("\\n训练完成!")
print(f"最佳验证准确率: {best_val_acc:.2f}%")
return self.metrics
# 梯度累积训练
def train_with_gradient_accumulation(model, train_loader, criterion, optimizer,
device, accumulation_steps=4):
"""梯度累积训练"""
model.train()
optimizer.zero_grad() # 在开始时清零梯度
for batch_idx, (inputs, targets) in enumerate(train_loader):
inputs, targets = inputs.to(device), targets.to(device)
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, targets)
# 反向传播(累积梯度)
loss = loss / accumulation_steps # 缩放损失
loss.backward()
# 每accumulation_steps步更新一次参数
if (batch_idx + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
print(f"更新参数,批次: {batch_idx+1}")
# 处理剩余的梯度
if len(train_loader) % accumulation_steps != 0:
optimizer.step()
optimizer.zero_grad()
print("高级训练技巧:")
print("1. 自动混合精度训练: 加快训练速度,减少显存使用")
print("2. 梯度累积: 模拟大batch size训练")
print("3. 梯度裁剪: 防止梯度爆炸")
print("4. 早停: 防止过拟合")
print("5. 学习率调度: 动态调整学习率")
'''
print(advanced_training_code)
# 3. 模型评估与测试
print("\n3. 模型评估与测试:")
evaluation_code = '''import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
class ModelEvaluator:
"""模型评估器"""
def __init__(self, model, device):
self.model = model
self.device = device
def predict(self, dataloader):
"""批量预测"""
self.model.eval()
all_predictions = []
all_targets = []
all_probabilities = []
with torch.no_grad():
for inputs, targets in dataloader:
inputs = inputs.to(self.device)
# 前向传播
outputs = self.model(inputs)
# 获取预测结果
probabilities = torch.softmax(outputs, dim=1)
_, predictions = outputs.max(1)
# 收集结果
all_predictions.extend(predictions.cpu().numpy())
all_targets.extend(targets.numpy())
all_probabilities.extend(probabilities.cpu().numpy())
return {
'predictions': np.array(all_predictions),
'targets': np.array(all_targets),
'probabilities': np.array(all_probabilities)
}
def compute_metrics(self, predictions_dict):
"""计算评估指标"""
y_true = predictions_dict['targets']
y_pred = predictions_dict['predictions']
metrics = {
'accuracy': accuracy_score(y_true, y_pred),
'precision_macro': precision_score(y_true, y_pred, average='macro'),
'recall_macro': recall_score(y_true, y_pred, average='macro'),
'f1_macro': f1_score(y_true, y_pred, average='macro')
}
# 二分类特定指标
if len(np.unique(y_true)) == 2:
metrics['precision_binary'] = precision_score(y_true, y_pred)
metrics['recall_binary'] = recall_score(y_true, y_pred)
metrics['f1_binary'] = f1_score(y_true, y_pred)
return metrics
def plot_confusion_matrix(self, predictions_dict, class_names=None):
"""绘制混淆矩阵"""
y_true = predictions_dict['targets']
y_pred = predictions_dict['predictions']
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=class_names, yticklabels=class_names)
plt.xlabel('预测标签')
plt.ylabel('真实标签')
plt.title('混淆矩阵')
plt.tight_layout()
plt.show()
return cm
def print_classification_report(self, predictions_dict, class_names=None):
"""打印分类报告"""
y_true = predictions_dict['targets']
y_pred = predictions_dict['predictions']
report = classification_report(y_true, y_pred, target_names=class_names)
print("分类报告:")
print(report)
return report
def plot_roc_curve(self, predictions_dict, num_classes):
"""绘制ROC曲线(多分类)"""
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
y_true = predictions_dict['targets']
y_prob = predictions_dict['probabilities']
# 二值化标签
y_true_bin = label_binarize(y_true, classes=range(num_classes))
# 计算每个类别的ROC曲线和AUC
fpr = {}
tpr = {}
roc_auc = {}
plt.figure(figsize=(10, 8))
for i in range(num_classes):
fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_prob[:, i])
roc_auc[i] = auc(fpr[i], tpr[i])
plt.plot(fpr[i], tpr[i], lw=2,
label=f'类别 {i} (AUC = {roc_auc[i]:.2f})')
# 绘制对角线
plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('假正率')
plt.ylabel('真正率')
plt.title('多分类ROC曲线')
plt.legend(loc="lower right")
plt.grid(alpha=0.3)
plt.show()
return fpr, tpr, roc_auc
def plot_training_history(self, history):
"""绘制训练历史"""
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# 损失曲线
axes[0].plot(history['train']['loss'], label='训练损失')
axes[0].plot(history['val']['loss'], label='验证损失')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('损失')
axes[0].set_title('训练和验证损失')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 准确率曲线
axes[1].plot(history['train']['acc'], label='训练准确率')
axes[1].plot(history['val']['acc'], label='验证准确率')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('准确率 (%)')
axes[1].set_title('训练和验证准确率')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 使用示例
print("模型评估功能:")
print("1. 批量预测")
print("2. 计算多种评估指标")
print("3. 绘制混淆矩阵")
print("4. 打印分类报告")
print("5. 绘制ROC曲线")
print("6. 可视化训练历史")
'''
print(evaluation_code)
return None
basic_training_loop()
✅ 工程实践要点:
clip_grad_norm_() 是防止 RNN/LSTM 训练崩溃的必备项;
early_stopping_patience 建议设为 5–10,避免过早终止;
history 字典应持久化为 JSON,便于后续分析与可视化。
六、PyTorch模型部署与生产
1. 模型保存与加载
生产环境中,模型的可移植性与版本可控性至关重要。PyTorch 提供了多层级的序列化方案。
def model_saving_loading():
"""PyTorch模型保存与加载"""
print("=" * 80)
print("PyTorch模型保存与加载")
print("=" * 80)
# 1. 基础保存与加载
print("\n1. 基础保存与加载:")
basic_saving_code = '''import torch
import torch.nn as nn
import torch.optim as optim
# 创建示例模型
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(10, 20)
self.fc2 = nn.Linear(20, 1)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
model = SimpleModel()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 1. 保存整个模型(不推荐)
print("方法1: 保存整个模型")
torch.save(model, 'model_complete.pth')
# 加载整个模型
loaded_model = torch.load('model_complete.pth')
print(f"加载完整模型: {type(loaded_model)}")
# 2. 保存模型状态字典(推荐)
print("\\n方法2: 保存模型状态字典")
torch.save(model.state_dict(), 'model_state_dict.pth')
# 创建新模型并加载状态字典
new_model = SimpleModel()
new_model.load_state_dict(torch.load('model_state_dict.pth'))
print(f"加载状态字典到新模型: {type(new_model)}")
# 3. 保存检查点(训练中保存)
print("\\n方法3: 保存检查点")
checkpoint = {
'epoch': 10,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': 0.05,
'accuracy': 0.95
}
torch.save(checkpoint, 'checkpoint.pth')
# 加载检查点
loaded_checkpoint = torch.load('checkpoint.pth')
model.load_state_dict(loaded_checkpoint['model_state_dict'])
optimizer.load_state_dict(loaded_checkpoint['optimizer_state_dict'])
epoch = loaded_checkpoint['epoch']
print(f"加载检查点: epoch={epoch}, loss={loaded_checkpoint['loss']}, accuracy={loaded_checkpoint['accuracy']}")
# 4. 保存多个模型
print("\\n方法4: 保存多个模型")
ensemble_models = {
'model1': model.state_dict(),
'model2': SimpleModel().state_dict(),
'metadata': {
'created_date': '2024-01-01',
'version': '1.0'
}
}
torch.save(ensemble_models, 'ensemble_models.pth')
'''
print(basic_saving_code)
# 2. 跨设备保存与加载
print("\n2. 跨设备保存与加载:")
cross_device_code = '''# 跨设备保存与加载
print("跨设备模型处理:")
# 创建GPU模型(如果有GPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_gpu = SimpleModel().to(device)
# 保存GPU模型
torch.save(model_gpu.state_dict(), 'model_gpu.pth')
# 加载到CPU
model_cpu = SimpleModel()
state_dict = torch.load('model_gpu.pth', map_location=torch.device('cpu'))
model_cpu.load_state_dict(state_dict)
print(f"GPU模型加载到CPU: 成功")
# 加载到GPU
if torch.cuda.is_available():
model_gpu2 = SimpleModel().to(device)
state_dict = torch.load('model_gpu.pth', map_location=device)
model_gpu2.load_state_dict(state_dict)
print(f"GPU模型加载到GPU: 成功")
# 处理不同GPU编号
print("\\n处理不同GPU编号:")
# 保存时指定map_location
def load_model_flexible(model_path, device):
"""灵活加载模型到指定设备"""
# 尝试加载到指定设备
try:
state_dict = torch.load(model_path, map_location=device)
except:
# 如果失败,加载到CPU然后移动到设备
state_dict = torch.load(model_path, map_location='cpu')
model = SimpleModel()
model.load_state_dict(state_dict)
model = model.to(device)
return model
print("灵活加载模型函数已创建")
'''
print(cross_device_code)
# 3. TorchScript和模型序列化
print("\n3. TorchScript和模型序列化:")
torchscript_code = '''# TorchScript: 将PyTorch模型转换为可序列化的格式
print("TorchScript模型序列化:")
# 创建示例模型
model = SimpleModel()
model.eval() # 转换为推理模式
# 方法1: TorchScript Tracing(跟踪)
print("方法1: Tracing")
example_input = torch.randn(1, 10)
traced_model = torch.jit.trace(model, example_input)
# 保存TorchScript模型
traced_model.save('traced_model.pt')
print(f"Tracing模型已保存: traced_model.pt")
# 加载TorchScript模型
loaded_traced = torch.jit.load('traced_model.pt')
output = loaded_traced(example_input)
print(f"加载的Tracing模型输出: {output.shape}")
# 方法2: TorchScript Scripting(脚本)
print("\\n方法2: Scripting")
class SimpleModelWithControlFlow(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(10, 20)
self.fc2 = nn.Linear(20, 1)
self.threshold = 0.5
def forward(self, x):
x = torch.relu(self.fc1(x))
# 控制流(Tracing无法处理)
if x.mean() > self.threshold:
x = self.fc2(x)
else:
x = self.fc2(x) * 0.5
return x
model_with_control = SimpleModelWithControlFlow()
model_with_control.eval()
# 使用Scripting处理控制流
scripted_model = torch.jit.script(model_with_control)
scripted_model.save('scripted_model.pt')
print(f"Scripting模型已保存: scripted_model.pt")
# 方法3: TorchScript优化
print("\\n方法3: TorchScript优化")
# 融合操作(需要模型在GPU上)
if torch.cuda.is_available():
model_cuda = SimpleModel().cuda()
model_cuda.eval()
# 转换为TorchScript
traced_cuda = torch.jit.trace(model_cuda, torch.randn(1, 10).cuda())
# 应用优化
torch.jit.freeze(traced_cuda) # 冻结模型
traced_cuda = torch.jit.optimize_for_inference(traced_cuda) # 推理优化
traced_cuda.save('optimized_model.pt')
print(f"优化模型已保存: optimized_model.pt")
# TorchScript的优点
print("\\nTorchScript的优点:")
print("1. 模型可序列化,不依赖Python代码")
print("2. 可以在C++中加载和运行")
print("3. 可以进行模型优化")
print("4. 支持移动端部署")
'''
print(torchscript_code)
# 4. 模型部署选项
print("\n4. 模型部署选项:")
deployment_code = '''# PyTorch模型部署选项
print("PyTorch模型部署选项:")
deployment_options = {
'TorchServe': {
'描述': 'PyTorch官方模型服务框架',
'特点': ['REST API', '模型版本管理', '自动批处理', '模型监控'],
'使用场景': '生产环境模型服务',
'安装': 'pip install torchserve torch-model-archiver'
},
'ONNX Runtime': {
'描述': '跨框架推理引擎',
'特点': ['高性能推理', '多硬件支持', '跨平台', '量化支持'],
'使用场景': '跨框架部署,性能关键应用',
'转换': 'torch.onnx.export()'
},
'TensorRT': {
'描述': 'NVIDIA高性能推理引擎',
'特点': ['极致性能', '低延迟', 'INT8/FP16量化', '动态形状'],
'使用场景': 'NVIDIA GPU上的高性能推理',
'要求': 'NVIDIA GPU, TensorRT SDK'
},
'Torch Mobile': {
'描述': '移动端部署',
'特点': ['iOS/Android支持', '模型优化', '离线推理'],
'使用场景': '移动应用',
'工具': 'PyTorch Mobile, LibTorch'
},
'FastAPI + PyTorch': {
'描述': '自定义API服务',
'特点': ['灵活控制', '易于定制', 'Python生态'],
'使用场景': '快速原型,定制需求',
'框架': 'FastAPI, Flask'
}
}
print("部署框架对比:")
for name, info in deployment_options.items():
print(f"\\n{name}:")
print(f" 描述: {info['描述']}")
print(f" 特点: {', '.join(info['特点'])}")
print(f" 使用场景: {info['使用场景']}")
# ONNX导出示例
print("\\nONNX导出示例:")
def export_to_onnx(model, input_shape, onnx_path='model.onnx'):
"""导出模型为ONNX格式"""
# 设置模型为评估模式
model.eval()
# 创建示例输入
dummy_input = torch.randn(*input_shape)
# 导出ONNX
torch.onnx.export(
model, # 要导出的模型
dummy_input, # 模型输入
onnx_path, # 输出文件路径
export_params=True, # 导出参数
opset_version=13, # ONNX opset版本
do_constant_folding=True, # 常量折叠优化
input_names=['input'], # 输入名称
output_names=['output'], # 输出名称
dynamic_axes={ # 动态轴(批处理维度)
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
print(f"模型已导出为: {onnx_path}")
return onnx_path
# TorchServe部署示例
print("\\nTorchServe部署步骤:")
torchserve_steps = '''# 1. 创建模型存档
torch-model-archiver --model-name mymodel \\
--version 1.0 \\
--model-file model.py \\
--serialized-file model.pth \\
--handler image_classifier.py \\
--extra-files index_to_name.json
# 2. 启动TorchServe
torchserve --start --model-store model_store \\
--models mymodel=mymodel.mar \\
--ncs
# 3. 调用API
curl http://localhost:8080/predictions/mymodel -T test_image.jpg
'''
print(torchserve_steps)
print("\\n部署建议:")
print("1. 原型开发: FastAPI + PyTorch")
print("2. 生产服务: TorchServe")
print("3. 跨平台: ONNX Runtime")
print("4. 极致性能: TensorRT")
print("5. 移动端: PyTorch Mobile")
'''
print(deployment_code)
return None
model_saving_loading()
🚀 选型决策树:
- 若你已有 Kubernetes 集群,且需灰度发布、AB 测试、流量镜像 —— 选 TorchServe;
- 若需部署到边缘设备(Jetson、树莓派)或 Web(WebAssembly)—— 选 ONNX Runtime;
- 若追求极致吞吐与低延迟(如广告 CTR 预估)—— 用 TensorRT + FP16;
- 若是 iOS/Android App 内嵌 AI 能力 —— PyTorch Mobile 是最轻量选择。
PyTorch 以其动态计算图和 Pythonic 的设计哲学,成为了深度学习研究和开发的首选框架之一。通过今天的学习,你已经掌握了 PyTorch 的核心概念和基本用法。记住:PyTorch 的强大之处在于其灵活性和直观性,这使得它特别适合研究和快速原型开发。
欢迎访问 云栈社区,获取更多 人工智能、智能 & 数据 & 云 和 Python 实战资源。