从零构建大型语言模型(Large Language Model)
我将带你一步步理解并构建一个简化版的大型语言模型。请注意,真正的工业级大模型需要大量计算资源和数据,但这里我们将构建一个概念验证版本。
1. 项目概述
我们将构建一个基于Transformer架构的小型语言模型,包括以下核心组件:
分词器(Tokenizer)
词嵌入(Embeddings)
Transformer块(多头注意力+前馈网络)
位置编码(Positional Encoding)
训练循环
2. 环境准备
# requirements.txt torch>=2.0.0 numpy>=1.24.0 tqdm>=4.65.0
3. 实现代码
3.1 分词器(Tokenizer)
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math
from collections import Counter
import json
class SimpleTokenizer:
"""基于字符级别的简单分词器"""
def __init__(self, text=None, vocab_size=512):
self.vocab_size = vocab_size
self.char_to_idx = {}
self.idx_to_char = {}
if text:
self.build_vocab(text)
def build_vocab(self, text):
# 统计字符频率
char_counts = Counter(text)
most_common = char_counts.most_common(self.vocab_size - 2)
# 构建词汇表
self.char_to_idx = {'<PAD>': 0, '<UNK>': 1}
self.idx_to_char = {0: '<PAD>', 1: '<UNK>'}
idx = 2
for char, _ in most_common:
self.char_to_idx[char] = idx
self.idx_to_char[idx] = char
idx += 1
def encode(self, text):
"""将文本转换为token IDs"""
tokens = []
for char in text:
tokens.append(self.char_to_idx.get(char, 1)) # 1代表<UNK>
return tokens
def decode(self, tokens):
"""将token IDs转换回文本"""
return ''.join([self.idx_to_char.get(token, '<UNK>') for token in tokens])
def save(self, path):
"""保存分词器"""
with open(path, 'w') as f:
json.dump({
'char_to_idx': self.char_to_idx,
'idx_to_char': {int(k): v for k, v in self.idx_to_char.items()},
'vocab_size': self.vocab_size
}, f)
def load(self, path):
"""加载分词器"""
with open(path, 'r') as f:
data = json.load(f)
self.char_to_idx = data['char_to_idx']
self.idx_to_char = {int(k): v for k, v in data['idx_to_char'].items()}
self.vocab_size = data['vocab_size']3.2 位置编码(Positional Encoding)
class PositionalEncoding(nn.Module):
"""Transformer的位置编码"""
def __init__(self, d_model, max_len=5000):
super().__init__()
# 创建位置编码矩阵
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
# 计算角度速率
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model))
# 应用正弦和余弦函数
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0) # [1, max_len, d_model]
self.register_buffer('pe', pe)
def forward(self, x):
# x: [batch_size, seq_len, d_model]
x = x + self.pe[:, :x.size(1), :]
return x3.3 多头注意力机制(Multi-Head Attention)
class MultiHeadAttention(nn.Module): """多头注意力机制""" def __init__(self, d_model, num_heads, dropout=0.1): super().__init__() assert d_model % num_heads == 0, "d_model必须能被num_heads整除" self.d_model = d_model self.num_heads = num_heads self.d_k = d_model // num_heads # 线性变换层 self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(dropout) def scaled_dot_product_attention(self, Q, K, V, mask=None): """缩放点积注意力""" attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k) if mask is not None: attn_scores = attn_scores.masked_fill(mask == 0, -1e9) attn_probs = F.softmax(attn_scores, dim=-1) attn_probs = self.dropout(attn_probs) output = torch.matmul(attn_probs, V) return output, attn_probs def split_heads(self, x): """分割多头""" batch_size, seq_len, d_model = x.size() return x.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2) def combine_heads(self, x): """合并多头""" batch_size, _, seq_len, d_k = x.size() return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model) def forward(self, Q, K, V, mask=None): batch_size = Q.size(0) # 线性变换并分割多头 Q = self.split_heads(self.W_q(Q)) K = self.split_heads(self.W_k(K)) V = self.split_heads(self.W_v(V)) # 计算注意力 attn_output, attn_probs = self.scaled_dot_product_attention(Q, K, V, mask) # 合并多头并线性变换 output = self.W_o(self.combine_heads(attn_output)) return output, attn_probs
3.4 前馈网络(Feed Forward Network)
class FeedForward(nn.Module): """前馈网络(两个线性层)""" def __init__(self, d_model, d_ff=2048, dropout=0.1): super().__init__() self.linear1 = nn.Linear(d_model, d_ff) self.dropout = nn.Dropout(dropout) self.linear2 = nn.Linear(d_ff, d_model) def forward(self, x): return self.linear2(self.dropout(F.relu(self.linear1(x))))
3.5 Transformer编码器层
class TransformerEncoderLayer(nn.Module): """Transformer编码器层""" def __init__(self, d_model, num_heads, d_ff=2048, dropout=0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, num_heads, dropout) self.norm1 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.feed_forward = FeedForward(d_model, d_ff, dropout) self.norm2 = nn.LayerNorm(d_model) self.dropout2 = nn.Dropout(dropout) def forward(self, x, mask=None): # 多头注意力 + 残差连接 + 层归一化 attn_output, _ = self.self_attn(x, x, x, mask) x = x + self.dropout1(attn_output) x = self.norm1(x) # 前馈网络 + 残差连接 + 层归一化 ff_output = self.feed_forward(x) x = x + self.dropout2(ff_output) x = self.norm2(x) return x
3.6 完整的语言模型
class SimpleLLM(nn.Module):
"""简化版的大型语言模型"""
def __init__(self, vocab_size, d_model=256, num_layers=6,
num_heads=8, d_ff=1024, max_seq_len=512, dropout=0.1):
super().__init__()
self.vocab_size = vocab_size
self.d_model = d_model
# 词嵌入层
self.token_embedding = nn.Embedding(vocab_size, d_model)
# 位置编码
self.positional_encoding = PositionalEncoding(d_model, max_seq_len)
# Transformer编码器层
self.layers = nn.ModuleList([
TransformerEncoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
# 层归一化
self.norm = nn.LayerNorm(d_model)
# 输出层
self.output_layer = nn.Linear(d_model, vocab_size)
# 初始化权重
self._init_weights()
def _init_weights(self):
"""初始化模型权重"""
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
def create_mask(self, seq):
"""创建注意力掩码(因果掩码)"""
batch_size, seq_len = seq.size()
# 下三角矩阵(包括对角线)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0).unsqueeze(0)
return mask
def forward(self, input_ids):
batch_size, seq_len = input_ids.size()
# 创建因果掩码
mask = self.create_mask(input_ids).to(input_ids.device)
# 词嵌入
token_embeds = self.token_embedding(input_ids) # [batch, seq_len, d_model]
# 添加位置编码
x = self.positional_encoding(token_embeds)
# 通过Transformer层
for layer in self.layers:
x = layer(x, mask)
# 层归一化
x = self.norm(x)
# 输出logits
logits = self.output_layer(x) # [batch, seq_len, vocab_size]
return logits
def generate(self, prompt, tokenizer, max_length=100, temperature=1.0, top_k=50):
"""生成文本"""
self.eval()
# 编码提示文本
tokens = tokenizer.encode(prompt)
tokens_tensor = torch.tensor([tokens]).long()
if torch.cuda.is_available():
tokens_tensor = tokens_tensor.cuda()
# 生成文本
for _ in range(max_length):
# 前向传播
with torch.no_grad():
logits = self.forward(tokens_tensor)
next_token_logits = logits[0, -1, :] / temperature
# Top-k采样
if top_k > 0:
top_k_values, top_k_indices = torch.topk(next_token_logits, top_k)
next_token_logits[~torch.isin(torch.arange(len(next_token_logits)).to(next_token_logits.device),
top_k_indices)] = -float('inf')
# 采样下一个token
probs = F.softmax(next_token_logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
# 添加到序列
tokens_tensor = torch.cat([tokens_tensor, next_token.unsqueeze(0)], dim=1)
# 如果生成了结束符,停止生成
if next_token.item() == 0: # <PAD>作为结束符
break
# 解码为文本
generated_tokens = tokens_tensor[0].tolist()
generated_text = tokenizer.decode(generated_tokens)
return generated_text3.7 训练脚本
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
class TextDataset(Dataset):
"""文本数据集"""
def __init__(self, texts, tokenizer, seq_length=128):
self.tokenizer = tokenizer
self.seq_length = seq_length
self.data = []
for text in texts:
tokens = tokenizer.encode(text)
# 分割成长度为seq_length+1的序列(+1是因为需要预测下一个token)
for i in range(0, len(tokens) - seq_length, seq_length // 2):
chunk = tokens[i:i + seq_length + 1]
if len(chunk) == seq_length + 1:
self.data.append(chunk)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
chunk = self.data[idx]
input_ids = torch.tensor(chunk[:-1], dtype=torch.long)
target_ids = torch.tensor(chunk[1:], dtype=torch.long)
return input_ids, target_ids
def train_model(model, dataset, epochs=10, batch_size=32, lr=3e-4):
"""训练模型"""
# 数据加载器
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# 优化器
optimizer = optim.AdamW(model.parameters(), lr=lr)
# 学习率调度器
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
# 损失函数
criterion = nn.CrossEntropyLoss(ignore_index=0) # 忽略<PAD>
# 训练循环
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
for epoch in range(epochs):
model.train()
total_loss = 0
progress_bar = tqdm(dataloader, desc=f'Epoch {epoch+1}/{epochs}')
for batch_idx, (input_ids, target_ids) in enumerate(progress_bar):
input_ids, target_ids = input_ids.to(device), target_ids.to(device)
# 前向传播
logits = model(input_ids)
# 计算损失
# 调整logits和targets的形状以匹配CrossEntropyLoss的要求
loss = criterion(logits.view(-1, logits.size(-1)), target_ids.view(-1))
# 反向传播
optimizer.zero_grad()
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 更新参数
optimizer.step()
total_loss += loss.item()
progress_bar.set_postfix({'loss': loss.item()})
# 更新学习率
scheduler.step()
avg_loss = total_loss / len(dataloader)
print(f'Epoch {epoch+1}: Average Loss = {avg_loss:.4f}')
# 保存模型检查点
if (epoch + 1) % 5 == 0:
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': avg_loss,
}, f'model_checkpoint_epoch_{epoch+1}.pth')
return model
def load_data(file_path):
"""加载训练数据"""
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
# 分割成较小的文本块
chunk_size = 1000
texts = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
return texts
# 主训练函数
def main():
# 加载数据
print("Loading data...")
texts = load_data('training_data.txt') # 替换为你的训练数据文件
# 初始化分词器
print("Building vocabulary...")
tokenizer = SimpleTokenizer(''.join(texts), vocab_size=512)
# 创建数据集
print("Creating dataset...")
dataset = TextDataset(texts, tokenizer, seq_length=128)
# 初始化模型
print("Initializing model...")
model = SimpleLLM(
vocab_size=tokenizer.vocab_size,
d_model=256,
num_layers=4,
num_heads=8,
d_ff=1024,
max_seq_len=128
)
# 打印模型参数数量
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")
# 训练模型
print("Starting training...")
model = train_model(
model=model,
dataset=dataset,
epochs=20,
batch_size=16,
lr=3e-4
)
# 保存最终模型
print("Saving final model...")
torch.save(model.state_dict(), 'final_model.pth')
tokenizer.save('tokenizer.json')
# 测试生成
print("\nTesting generation...")
prompt = "Once upon a time"
generated = model.generate(prompt, tokenizer, max_length=50, temperature=0.8)
print(f"Prompt: {prompt}")
print(f"Generated: {generated}")
if __name__ == "__main__":
main()4. 模型优化建议
4.1 性能优化
# 使用混合精度训练(如果GPU支持) from torch.cuda.amp import autocast, GradScaler def train_with_amp(model, dataloader, epochs=10, lr=3e-4): scaler = GradScaler() optimizer = optim.AdamW(model.parameters(), lr=lr) for epoch in range(epochs): model.train() for input_ids, target_ids in dataloader: optimizer.zero_grad() with autocast(): logits = model(input_ids) loss = F.cross_entropy(logits.view(-1, logits.size(-1)), target_ids.view(-1), ignore_index=0) scaler.scale(loss).backward() scaler.step(optimizer) scaler.update()
4.2 更大的模型配置
class MediumLLM(SimpleLLM): """中等规模的LLM""" def __init__(self, vocab_size): super().__init__( vocab_size=vocab_size, d_model=768, # 增加嵌入维度 num_layers=12, # 增加层数 num_heads=12, # 增加注意力头数 d_ff=3072, # 增加前馈网络维度 max_seq_len=1024, # 增加序列长度 dropout=0.1 ) class LargeLLM(SimpleLLM): """大规模LLM(类似GPT-2 Small)""" def __init__(self, vocab_size=50257): # GPT-2的词汇表大小 super().__init__( vocab_size=vocab_size, d_model=768, num_layers=12, num_heads=12, d_ff=3072, max_seq_len=1024, dropout=0.1 )
5. 部署和使用
5.1 推理脚本
def load_model_for_inference(model_path, tokenizer_path, device='cpu'):
"""加载模型进行推理"""
# 加载分词器
tokenizer = SimpleTokenizer()
tokenizer.load(tokenizer_path)
# 初始化模型
model = SimpleLLM(vocab_size=tokenizer.vocab_size)
# 加载权重
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()
return model, tokenizer
def interactive_generation(model, tokenizer):
"""交互式文本生成"""
print("Interactive Text Generation")
print("Type 'quit' to exit")
print("-" * 50)
while True:
prompt = input("\nEnter prompt: ")
if prompt.lower() == 'quit':
break
max_length = int(input("Max length (default 100): ") or "100")
temperature = float(input("Temperature (default 1.0): ") or "1.0")
generated = model.generate(
prompt,
tokenizer,
max_length=max_length,
temperature=temperature
)
print("\nGenerated text:")
print("-" * 50)
print(generated)
print("-" * 50)
# 运行推理
if __name__ == "__main__":
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model, tokenizer = load_model_for_inference(
'final_model.pth',
'tokenizer.json',
device
)
interactive_generation(model, tokenizer)6. 注意事项和限制
6.1 当前实现的限制
规模有限:相比真正的LLM(数十亿参数),这个实现很小
训练数据需求:需要大量高质量文本数据
计算资源:即使这个小模型也需要相当的计算资源
没有预训练:这是从头训练,没有使用预训练权重
6.2 扩展方向
使用预训练权重:从Hugging Face等平台加载预训练模型
分布式训练:使用多GPU或TPU进行训练
更高效的分词器:实现BPE或WordPiece分词器
模型量化:减少模型大小,加速推理
7. 资源需求估算
| 模型大小 | 参数量 | GPU内存 | 训练时间 | 适合的数据集 |
|---|---|---|---|---|
| Small (本实现) | ~5M | 2-4GB | 小时级 | 小型文本 |
| Medium | ~100M | 8-16GB | 天级 | 维基百科 |
| Large | ~1B | 32GB+ | 周级 | 大型语料库 |
从零构建大模型.pdf
类型:TXT文件|已下载:0|下载方式:免费下载
立即下载



还没有评论,来说两句吧...