本文作者:icy

Build a Large Language Model (From Scratch) 从零构建大模型

icy 昨天 14 抢沙发
Build a Large Language Model (From Scratch) 从零构建大模型摘要: 从零构建大型语言模型(Large Language Model)我将带你一步步理解并构建一个简化版的大型语言模型。请注意,真正的工业级大模型需要大量计算资源和数据,但这里我们将构建...



从零构建大型语言模型(Large Language Model)

我将带你一步步理解并构建一个简化版的大型语言模型。请注意,真正的工业级大模型需要大量计算资源和数据,但这里我们将构建一个概念验证版本。

1. 项目概述

我们将构建一个基于Transformer架构的小型语言模型,包括以下核心组件:

  • 分词器(Tokenizer)

  • 词嵌入(Embeddings)

  • Transformer块(多头注意力+前馈网络)

  • 位置编码(Positional Encoding)

  • 训练循环

2. 环境准备

# requirements.txt
torch>=2.0.0
numpy>=1.24.0
tqdm>=4.65.0

3. 实现代码

3.1 分词器(Tokenizer)

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math
from collections import Counter
import json

class SimpleTokenizer:
    """基于字符级别的简单分词器"""
    
    def __init__(self, text=None, vocab_size=512):
        self.vocab_size = vocab_size
        self.char_to_idx = {}
        self.idx_to_char = {}
        
        if text:
            self.build_vocab(text)
    
    def build_vocab(self, text):
        # 统计字符频率
        char_counts = Counter(text)
        most_common = char_counts.most_common(self.vocab_size - 2)
        
        # 构建词汇表
        self.char_to_idx = {'<PAD>': 0, '<UNK>': 1}
        self.idx_to_char = {0: '<PAD>', 1: '<UNK>'}
        
        idx = 2
        for char, _ in most_common:
            self.char_to_idx[char] = idx
            self.idx_to_char[idx] = char
            idx += 1
    
    def encode(self, text):
        """将文本转换为token IDs"""
        tokens = []
        for char in text:
            tokens.append(self.char_to_idx.get(char, 1))  # 1代表<UNK>
        return tokens
    
    def decode(self, tokens):
        """将token IDs转换回文本"""
        return ''.join([self.idx_to_char.get(token, '<UNK>') for token in tokens])
    
    def save(self, path):
        """保存分词器"""
        with open(path, 'w') as f:
            json.dump({
                'char_to_idx': self.char_to_idx,
                'idx_to_char': {int(k): v for k, v in self.idx_to_char.items()},
                'vocab_size': self.vocab_size
            }, f)
    
    def load(self, path):
        """加载分词器"""
        with open(path, 'r') as f:
            data = json.load(f)
        self.char_to_idx = data['char_to_idx']
        self.idx_to_char = {int(k): v for k, v in data['idx_to_char'].items()}
        self.vocab_size = data['vocab_size']

3.2 位置编码(Positional Encoding)

class PositionalEncoding(nn.Module):
    """Transformer的位置编码"""
    
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        
        # 创建位置编码矩阵
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        
        # 计算角度速率
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                           (-math.log(10000.0) / d_model))
        
        # 应用正弦和余弦函数
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0)  # [1, max_len, d_model]
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        x = x + self.pe[:, :x.size(1), :]
        return x

3.3 多头注意力机制(Multi-Head Attention)

class MultiHeadAttention(nn.Module):
    """多头注意力机制"""
    
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        assert d_model % num_heads == 0, "d_model必须能被num_heads整除"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        # 线性变换层
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(dropout)
    
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """缩放点积注意力"""
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        
        attn_probs = F.softmax(attn_scores, dim=-1)
        attn_probs = self.dropout(attn_probs)
        
        output = torch.matmul(attn_probs, V)
        return output, attn_probs
    
    def split_heads(self, x):
        """分割多头"""
        batch_size, seq_len, d_model = x.size()
        return x.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
    
    def combine_heads(self, x):
        """合并多头"""
        batch_size, _, seq_len, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
    
    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        
        # 线性变换并分割多头
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        # 计算注意力
        attn_output, attn_probs = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # 合并多头并线性变换
        output = self.W_o(self.combine_heads(attn_output))
        
        return output, attn_probs

3.4 前馈网络(Feed Forward Network)

class FeedForward(nn.Module):
    """前馈网络(两个线性层)"""
    
    def __init__(self, d_model, d_ff=2048, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)
    
    def forward(self, x):
        return self.linear2(self.dropout(F.relu(self.linear1(x))))

3.5 Transformer编码器层

class TransformerEncoderLayer(nn.Module):
    """Transformer编码器层"""
    
    def __init__(self, d_model, num_heads, d_ff=2048, dropout=0.1):
        super().__init__()
        
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        
        self.feed_forward = FeedForward(d_model, d_ff, dropout)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout2 = nn.Dropout(dropout)
    
    def forward(self, x, mask=None):
        # 多头注意力 + 残差连接 + 层归一化
        attn_output, _ = self.self_attn(x, x, x, mask)
        x = x + self.dropout1(attn_output)
        x = self.norm1(x)
        
        # 前馈网络 + 残差连接 + 层归一化
        ff_output = self.feed_forward(x)
        x = x + self.dropout2(ff_output)
        x = self.norm2(x)
        
        return x

3.6 完整的语言模型

class SimpleLLM(nn.Module):
    """简化版的大型语言模型"""
    
    def __init__(self, vocab_size, d_model=256, num_layers=6, 
                 num_heads=8, d_ff=1024, max_seq_len=512, dropout=0.1):
        super().__init__()
        
        self.vocab_size = vocab_size
        self.d_model = d_model
        
        # 词嵌入层
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        
        # 位置编码
        self.positional_encoding = PositionalEncoding(d_model, max_seq_len)
        
        # Transformer编码器层
        self.layers = nn.ModuleList([
            TransformerEncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
        # 层归一化
        self.norm = nn.LayerNorm(d_model)
        
        # 输出层
        self.output_layer = nn.Linear(d_model, vocab_size)
        
        # 初始化权重
        self._init_weights()
    
    def _init_weights(self):
        """初始化模型权重"""
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)
    
    def create_mask(self, seq):
        """创建注意力掩码(因果掩码)"""
        batch_size, seq_len = seq.size()
        # 下三角矩阵(包括对角线)
        mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0).unsqueeze(0)
        return mask
    
    def forward(self, input_ids):
        batch_size, seq_len = input_ids.size()
        
        # 创建因果掩码
        mask = self.create_mask(input_ids).to(input_ids.device)
        
        # 词嵌入
        token_embeds = self.token_embedding(input_ids)  # [batch, seq_len, d_model]
        
        # 添加位置编码
        x = self.positional_encoding(token_embeds)
        
        # 通过Transformer层
        for layer in self.layers:
            x = layer(x, mask)
        
        # 层归一化
        x = self.norm(x)
        
        # 输出logits
        logits = self.output_layer(x)  # [batch, seq_len, vocab_size]
        
        return logits
    
    def generate(self, prompt, tokenizer, max_length=100, temperature=1.0, top_k=50):
        """生成文本"""
        self.eval()
        
        # 编码提示文本
        tokens = tokenizer.encode(prompt)
        tokens_tensor = torch.tensor([tokens]).long()
        
        if torch.cuda.is_available():
            tokens_tensor = tokens_tensor.cuda()
        
        # 生成文本
        for _ in range(max_length):
            # 前向传播
            with torch.no_grad():
                logits = self.forward(tokens_tensor)
                next_token_logits = logits[0, -1, :] / temperature
            
            # Top-k采样
            if top_k > 0:
                top_k_values, top_k_indices = torch.topk(next_token_logits, top_k)
                next_token_logits[~torch.isin(torch.arange(len(next_token_logits)).to(next_token_logits.device), 
                                            top_k_indices)] = -float('inf')
            
            # 采样下一个token
            probs = F.softmax(next_token_logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)
            
            # 添加到序列
            tokens_tensor = torch.cat([tokens_tensor, next_token.unsqueeze(0)], dim=1)
            
            # 如果生成了结束符,停止生成
            if next_token.item() == 0:  # <PAD>作为结束符
                break
        
        # 解码为文本
        generated_tokens = tokens_tensor[0].tolist()
        generated_text = tokenizer.decode(generated_tokens)
        
        return generated_text

3.7 训练脚本

import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

class TextDataset(Dataset):
    """文本数据集"""
    
    def __init__(self, texts, tokenizer, seq_length=128):
        self.tokenizer = tokenizer
        self.seq_length = seq_length
        self.data = []
        
        for text in texts:
            tokens = tokenizer.encode(text)
            # 分割成长度为seq_length+1的序列(+1是因为需要预测下一个token)
            for i in range(0, len(tokens) - seq_length, seq_length // 2):
                chunk = tokens[i:i + seq_length + 1]
                if len(chunk) == seq_length + 1:
                    self.data.append(chunk)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        chunk = self.data[idx]
        input_ids = torch.tensor(chunk[:-1], dtype=torch.long)
        target_ids = torch.tensor(chunk[1:], dtype=torch.long)
        return input_ids, target_ids

def train_model(model, dataset, epochs=10, batch_size=32, lr=3e-4):
    """训练模型"""
    
    # 数据加载器
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # 优化器
    optimizer = optim.AdamW(model.parameters(), lr=lr)
    
    # 学习率调度器
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
    
    # 损失函数
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # 忽略<PAD>
    
    # 训练循环
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        
        progress_bar = tqdm(dataloader, desc=f'Epoch {epoch+1}/{epochs}')
        for batch_idx, (input_ids, target_ids) in enumerate(progress_bar):
            input_ids, target_ids = input_ids.to(device), target_ids.to(device)
            
            # 前向传播
            logits = model(input_ids)
            
            # 计算损失
            # 调整logits和targets的形状以匹配CrossEntropyLoss的要求
            loss = criterion(logits.view(-1, logits.size(-1)), target_ids.view(-1))
            
            # 反向传播
            optimizer.zero_grad()
            loss.backward()
            
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            
            # 更新参数
            optimizer.step()
            
            total_loss += loss.item()
            progress_bar.set_postfix({'loss': loss.item()})
        
        # 更新学习率
        scheduler.step()
        
        avg_loss = total_loss / len(dataloader)
        print(f'Epoch {epoch+1}: Average Loss = {avg_loss:.4f}')
        
        # 保存模型检查点
        if (epoch + 1) % 5 == 0:
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': avg_loss,
            }, f'model_checkpoint_epoch_{epoch+1}.pth')
    
    return model

def load_data(file_path):
    """加载训练数据"""
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    
    # 分割成较小的文本块
    chunk_size = 1000
    texts = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
    
    return texts

# 主训练函数
def main():
    # 加载数据
    print("Loading data...")
    texts = load_data('training_data.txt')  # 替换为你的训练数据文件
    
    # 初始化分词器
    print("Building vocabulary...")
    tokenizer = SimpleTokenizer(''.join(texts), vocab_size=512)
    
    # 创建数据集
    print("Creating dataset...")
    dataset = TextDataset(texts, tokenizer, seq_length=128)
    
    # 初始化模型
    print("Initializing model...")
    model = SimpleLLM(
        vocab_size=tokenizer.vocab_size,
        d_model=256,
        num_layers=4,
        num_heads=8,
        d_ff=1024,
        max_seq_len=128
    )
    
    # 打印模型参数数量
    total_params = sum(p.numel() for p in model.parameters())
    print(f"Total parameters: {total_params:,}")
    
    # 训练模型
    print("Starting training...")
    model = train_model(
        model=model,
        dataset=dataset,
        epochs=20,
        batch_size=16,
        lr=3e-4
    )
    
    # 保存最终模型
    print("Saving final model...")
    torch.save(model.state_dict(), 'final_model.pth')
    tokenizer.save('tokenizer.json')
    
    # 测试生成
    print("\nTesting generation...")
    prompt = "Once upon a time"
    generated = model.generate(prompt, tokenizer, max_length=50, temperature=0.8)
    print(f"Prompt: {prompt}")
    print(f"Generated: {generated}")

if __name__ == "__main__":
    main()

4. 模型优化建议

4.1 性能优化

# 使用混合精度训练(如果GPU支持)
from torch.cuda.amp import autocast, GradScaler

def train_with_amp(model, dataloader, epochs=10, lr=3e-4):
    scaler = GradScaler()
    optimizer = optim.AdamW(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        for input_ids, target_ids in dataloader:
            optimizer.zero_grad()
            
            with autocast():
                logits = model(input_ids)
                loss = F.cross_entropy(logits.view(-1, logits.size(-1)), 
                                      target_ids.view(-1), ignore_index=0)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

4.2 更大的模型配置

class MediumLLM(SimpleLLM):
    """中等规模的LLM"""
    
    def __init__(self, vocab_size):
        super().__init__(
            vocab_size=vocab_size,
            d_model=768,        # 增加嵌入维度
            num_layers=12,      # 增加层数
            num_heads=12,       # 增加注意力头数
            d_ff=3072,         # 增加前馈网络维度
            max_seq_len=1024,   # 增加序列长度
            dropout=0.1
        )

class LargeLLM(SimpleLLM):
    """大规模LLM(类似GPT-2 Small)"""
    
    def __init__(self, vocab_size=50257):  # GPT-2的词汇表大小
        super().__init__(
            vocab_size=vocab_size,
            d_model=768,
            num_layers=12,
            num_heads=12,
            d_ff=3072,
            max_seq_len=1024,
            dropout=0.1
        )

5. 部署和使用

5.1 推理脚本

def load_model_for_inference(model_path, tokenizer_path, device='cpu'):
    """加载模型进行推理"""
    # 加载分词器
    tokenizer = SimpleTokenizer()
    tokenizer.load(tokenizer_path)
    
    # 初始化模型
    model = SimpleLLM(vocab_size=tokenizer.vocab_size)
    
    # 加载权重
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    
    return model, tokenizer

def interactive_generation(model, tokenizer):
    """交互式文本生成"""
    print("Interactive Text Generation")
    print("Type 'quit' to exit")
    print("-" * 50)
    
    while True:
        prompt = input("\nEnter prompt: ")
        if prompt.lower() == 'quit':
            break
        
        max_length = int(input("Max length (default 100): ") or "100")
        temperature = float(input("Temperature (default 1.0): ") or "1.0")
        
        generated = model.generate(
            prompt, 
            tokenizer, 
            max_length=max_length, 
            temperature=temperature
        )
        
        print("\nGenerated text:")
        print("-" * 50)
        print(generated)
        print("-" * 50)

# 运行推理
if __name__ == "__main__":
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model, tokenizer = load_model_for_inference(
        'final_model.pth', 
        'tokenizer.json', 
        device
    )
    interactive_generation(model, tokenizer)

6. 注意事项和限制

6.1 当前实现的限制

  1. 规模有限:相比真正的LLM(数十亿参数),这个实现很小

  2. 训练数据需求:需要大量高质量文本数据

  3. 计算资源:即使这个小模型也需要相当的计算资源

  4. 没有预训练:这是从头训练,没有使用预训练权重

6.2 扩展方向

  1. 使用预训练权重:从Hugging Face等平台加载预训练模型

  2. 分布式训练:使用多GPU或TPU进行训练

  3. 更高效的分词器:实现BPE或WordPiece分词器

  4. 模型量化:减少模型大小,加速推理

7. 资源需求估算

模型大小参数量GPU内存训练时间适合的数据集
Small (本实现)~5M2-4GB小时级小型文本
Medium~100M8-16GB天级维基百科
Large~1B32GB+周级大型语料库
从零构建大模型.pdf
类型:TXT文件|已下载:0|下载方式:免费下载
立即下载
文章版权及转载声明

作者:icy本文地址:https://www.zelig.cn/2026/02/176.html发布于 昨天
文章转载或复制请以超链接形式并注明出处软角落-SoftNook

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

阅读
分享

发表评论

快捷回复:

验证码

评论列表 (暂无评论,14人围观)参与讨论

还没有评论,来说两句吧...