找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

237

积分

0

好友

29

主题
发表于 13 小时前 | 查看: 1| 回复: 0
import math
import pandas as pd
import torch
from torch import nn
from d2l import torch as d2l

1. 基于位置的前馈网络 (PositionWiseFFN)

该模块即Transformer中的前馈神经网络层,它对序列中每个位置的向量进行独立且相同的变换。

构造函数接收三个核心参数:

  • ffn_num_input: 输入特征维度,通常与注意力层的输出维度 d_model 一致。
  • ffn_num_hiddens: 中间隐藏层维度,在原论文中通常被放大为 d_model 的4倍。
  • ffn_num_outputs: 输出特征维度,通常与 d_model 相等,以保持各子层输入输出维度一致,便于残差连接。
class PositionWiseFFN(nn.Module):
    """基于位置的前馈网络"""
    def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs, **kwargs):
        super(PositionWiseFFN, self).__init__(**kwargs)
        # 第一个全连接层,将输入维度扩展至隐藏维度
        self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens)
        # ReLU激活函数
        self.relu = nn.ReLU()
        # 第二个全连接层,将维度映射回输出维度
        self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)

    def forward(self, X):
        # 前向传播:线性变换 -> ReLU -> 线性变换
        return self.dense2(self.relu(self.dense1(X)))

以下是一个简单的实例化与测试示例,帮助你理解其输入输出形状:

# 实例化一个FFN层:输入4维,隐藏层4维,输出8维
ffn = PositionWiseFFN(4, 4, 8)
ffn.eval()  # 切换到评估模式

# 构造假数据:批次大小2,序列长度3,特征维度4
x = torch.ones((2, 3, 4))
out = ffn(x)
print(out.shape)  # 输出形状应为 torch.Size([2, 3, 8])
out0 = out[0]  # 取批次中的第一个样本
print(out0.shape)  # 输出形状应为 torch.Size([3, 8])

输出:

torch.Size([2, 3, 8])
torch.Size([3, 8])

2. 残差连接与层规范化 (AddNorm)

Transformer使用层规范化(LayerNorm)而非批规范化(BatchNorm)。我们先通过一个简单的例子对比两者差异。如果你想深入了解其他常见的神经网络优化技巧,可以阅读关于算法与数据结构的专题。

# 创建LayerNorm和BatchNorm1d层,特征维度均为2
ln = nn.LayerNorm(2)
bn = nn.BatchNorm1d(2)

# 构造数据
X = torch.tensor([[1, 2], [2, 3]], dtype=torch.float32)
print('layer norm:', ln(X))
print('batch norm:', bn(X))

输出:

layer norm: tensor([[-1.0000,  1.0000],
        [-1.0000,  1.0000]], grad_fn=<NativeLayerNormBackward0>)
batch norm: tensor([[-1.0000, -1.0000],
        [ 1.0000,  1.0000]], grad_fn=<NativeBatchNormBackward0>)

说明LayerNorm 对每个样本内部的特征进行归一化(均值为0,方差为1),而 BatchNorm1d 则是对一个批次中每个特征维度跨样本进行归一化。

现在,我们可以实现 AddNorm 类,它先将子层(如注意力或FFN)的输出与输入进行残差连接,再进行层规范化,并使用Dropout防止过拟合。

class AddNorm(nn.Module):
    """残差连接后进行层规范化"""
    def __init__(self, normalized_shape, dropout, **kwargs):
        super(AddNorm, self).__init__(**kwargs)
        self.dropout = nn.Dropout(dropout)
        self.ln = nn.LayerNorm(normalized_shape)

    def forward(self, X, Y):
        # Y是子层输出,X是子层输入。执行流程:Dropout(Y) -> 加X -> LayerNorm
        return self.ln(self.dropout(Y) + X)

测试该模块:

add_norm = AddNorm([3, 4], 0.5)  # 对最后两维(3,4)进行归一化
add_norm.eval()
X = torch.ones((2, 3, 4))
Y = torch.ones((2, 3, 4))
out = add_norm(X, Y)
print(out.shape)  # 输出形状保持不变: torch.Size([2, 3, 4])

输出:

torch.Size([2, 3, 4])

3. Transformer编码器块与堆叠

编码器块是Transformer的核心组件,包含一个多头自注意力子层和一个前馈网络子层,每个子层后都接有 AddNorm

class EncoderBlock(nn.Module):
    """Transformer编码器块"""
    def __init__(self, key_size, query_size, value_size, num_hiddens,
                 norm_shape, ffn_num_input, ffn_num_hiddens,
                 num_heads, dropout, use_bias=False, **kwargs):
        super(EncoderBlock, self).__init__(**kwargs)
        # 多头自注意力层
        self.attention = d2l.MultiHeadAttention(
            key_size, query_size, value_size, num_hiddens, num_heads, dropout, use_bias)
        self.addnorm1 = AddNorm(norm_shape, dropout)
        # 前馈网络层
        self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens, num_hiddens)
        self.addnorm2 = AddNorm(norm_shape, dropout)

    def forward(self, X, valid_lens):
        # 流程:自注意力 -> AddNorm -> FFN -> AddNorm
        Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
        return self.addnorm2(Y, self.ffn(Y))

编码器块不会改变输入张量的形状。让我们验证一下:

X = torch.ones((2, 100, 24))
valid_lens = torch.tensor([3, 2])  # 序列有效长度

encoder_blk = EncoderBlock(24, 24, 24, 24, [100, 24], 24, 48, 8, 0.5)
encoder_blk.eval()
print(encoder_blk(X, valid_lens).shape)  # 输出: torch.Size([2, 100, 24])

接下来,我们将多个编码器块堆叠起来,并加上词嵌入和位置编码,构成完整的Transformer编码器。

class TransformerEncoder(d2l.Encoder):
    """Transformer编码器"""
    def __init__(self, vocab_size, key_size, query_size, value_size,
                 num_hiddens, norm_shape,
                 ffn_num_input, ffn_num_hiddens, num_heads, num_layers,
                 dropout, use_bias=False, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.num_hiddens = num_hiddens
        # 词嵌入层
        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        # 位置编码
        self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)
        # 堆叠多个编码器块
        self.blks = nn.Sequential()
        for i in range(num_layers):
            self.blks.add_module("block"+str(i),
                EncoderBlock(key_size, query_size, value_size, num_hiddens,
                             norm_shape, ffn_num_input, ffn_num_hiddens,
                             num_heads, dropout, use_bias))

    def forward(self, X, valid_lens):
        # 1. 词嵌入并缩放,然后与位置编码相加
        X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        # 2. 存储各层注意力权重(用于可视化)
        self.attention_weights = [None] * len(self.blks)
        # 3. 逐层通过编码器块
        for i, blk in enumerate(self.blks):
            X = blk(X, valid_lens)
            self.attention_weights[i] = blk.attention.attention.attention_weights
        return X

实例化一个两层的编码器进行测试,这通常与使用Python进行模型构建的流程类似。

encoder = TransformerEncoder(
    vocab_size=200, key_size=24, query_size=24, value_size=24,
    num_hiddens=24, norm_shape=[100, 24],
    ffn_num_input=24, ffn_num_hiddens=48,
    num_heads=8, num_layers=2, dropout=0.5)
encoder.eval()
valid_lens = torch.tensor([3, 2])
output = encoder(torch.ones((2, 100), dtype=torch.long), valid_lens)
print(output.shape)  # 输出: torch.Size([2, 100, 24])

4. Transformer解码器块与堆叠

解码器块比编码器块更复杂,包含三个子层:掩码多头自注意力层、编码器-解码器注意力层(交叉注意力)和前馈网络层。

class DecoderBlock(nn.Module):
    """解码器中第i个块"""
    def __init__(self, key_size, query_size, value_size, num_hiddens,
                 norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
                 dropout, i, **kwargs):
        super(DecoderBlock, self).__init__(**kwargs)
        self.i = i
        # 第一层:掩码多头自注意力(防止看到未来信息)
        self.attention1 = d2l.MultiHeadAttention(
            key_size, query_size, value_size, num_hiddens, num_heads, dropout)
        self.addnorm1 = AddNorm(norm_shape, dropout)
        # 第二层:编码器-解码器多头注意力
        self.attention2 = d2l.MultiHeadAttention(
            key_size, query_size, value_size, num_hiddens, num_heads, dropout)
        self.addnorm2 = AddNorm(norm_shape, dropout)
        # 第三层:前馈网络
        self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens, num_hiddens)
        self.addnorm3 = AddNorm(norm_shape, dropout)

    def forward(self, X, state):
        # state: [编码器输出, 编码器有效长度, 各层历史缓存列表]
        enc_outputs, enc_valid_lens = state[0], state[1]

        # 训练与推理时,处理自注意力的Key/Value方式不同
        if state[2][self.i] is None:
            key_values = X
        else:
            key_values = torch.cat((state[2][self.i], X), dim=1)
        state[2][self.i] = key_values

        # 训练时,使用下三角掩码;推理时,掩码由自回归过程隐式保证
        if self.training:
            batch_size, num_steps, _ = X.shape
            dec_valid_lens = torch.arange(1, num_steps+1, device=X.device).repeat(batch_size, 1)
        else:
            dec_valid_lens = None

        # 第一层:掩码自注意力
        X2 = self.attention1(X, key_values, key_values, dec_valid_lens)
        Y = self.addnorm1(X, X2)
        # 第二层:交叉注意力 (Query来自解码器,Key/Value来自编码器)
        Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)
        Z = self.addnorm2(Y, Y2)
        # 第三层:前馈网络
        return self.addnorm3(Z, self.ffn(Z)), state

现在,我们将解码器块堆叠起来,并添加词嵌入、位置编码和最终的线性输出层,构成完整的Transformer解码器。

class TransformerDecoder(d2l.AttentionDecoder):
    def __init__(self, vocab_size, key_size, query_size, value_size,
                 num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
                 num_heads, num_layers, dropout, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.num_hiddens = num_hiddens
        self.num_layers = num_layers
        # 词嵌入与位置编码
        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)
        # 堆叠解码器块
        self.blks = nn.Sequential()
        for i in range(num_layers):
            self.blks.add_module("block"+str(i),
                DecoderBlock(key_size, query_size, value_size, num_hiddens,
                             norm_shape, ffn_num_input, ffn_num_hiddens,
                             num_heads, dropout, i))
        # 最终的线性层,输出词汇表概率分布
        self.dense = nn.Linear(num_hiddens, vocab_size)

    def init_state(self, enc_outputs, enc_valid_lens, *args):
        # 初始化解码器状态
        return [enc_outputs, enc_valid_lens, [None] * self.num_layers]

    def forward(self, X, state):
        # 嵌入与位置编码
        X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        # 存储注意力权重(用于可视化分析)
        self._attention_weights = [[None] * len(self.blks) for _ in range(2)]

        for i, blk in enumerate(self.blks):
            X, state = blk(X, state)
            # 记录解码器自注意力和编码器-解码器注意力的权重
            self._attention_weights[0][i] = blk.attention1.attention.attention_weights
            self._attention_weights[1][i] = blk.attention2.attention.attention_weights
        # 通过线性层生成最终输出
        return self.dense(X), state

    @property
    def attention_weights(self):
        return self._attention_weights

为了方便编码器与解码器的联合训练与推理,我们定义一个简单的封装类。这体现了构建复杂人工智能模型时常见的模块化设计思想。

class EncoderDecoderWrapper(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, enc_X, dec_input, enc_valid_len):
        enc_outputs = self.encoder(enc_X, enc_valid_len)
        state = self.decoder.init_state(enc_outputs, enc_valid_len)
        out = self.decoder(dec_input, state)
        y_hat = out[0] if isinstance(out, (tuple, list)) else out
        att = None
        if hasattr(self.decoder, "attention_weights"):
            try:
                att = self.decoder.attention_weights
            except Exception:
                att = None
        return y_hat, att

5. 模型训练与推理

现在,我们使用定义好的Transformer架构在一个机器翻译任务上进行训练。

# 定义超参数
num_hiddens, num_layers, dropout, batch_size, num_steps = 32, 2, 0.1, 64, 10
lr, num_epochs, device = 0.005, 200, d2l.try_gpu()
ffn_num_input, ffn_num_hiddens, num_heads = 32, 64, 4
key_size = query_size = value_size = 32
norm_shape = [32]

# 加载数据集
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)

# 实例化编码器和解码器
encoder = TransformerEncoder(
    len(src_vocab), key_size, query_size, value_size, num_hiddens,
    norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
    num_layers, dropout)
decoder = TransformerDecoder(
    len(tgt_vocab), key_size, query_size, value_size, num_hiddens,
    norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
    num_layers, dropout)

# 组装并训练模型
net = EncoderDecoderWrapper(encoder, decoder)
d2l.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)

训练过程中,损失函数值随训练轮次下降的曲线如下图所示,这是评估模型学习效果的重要指标。
损失随训练轮次变化图
训练损失随迭代轮数(epoch)下降的曲线,表明模型正在有效学习。

训练结束后,我们可以使用训练好的模型进行简单的翻译推理,并观察其效果。

engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
    translation, dec_attention_weight_seq = d2l.predict_seq2seq(
        net, eng, src_vocab, tgt_vocab, num_steps, device, True)
    print(f'{eng} => {translation}, bleu {d2l.bleu(translation, fra, k=2):.3f}')



上一篇:嵌入式软件工程师C语言面试30问:核心概念、内存管理与高频考点解析
下一篇:6种核心缓存技术深度解析:本地、Redis到数据库缓存实战指南
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-17 16:02 , Processed in 0.119818 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表