0%

CS336 assignment5

我个人对实验五的全部实现代码在:Stanford_CS336_Assignment5_2025Spring

Assignment5的目的是让我们对Qwen 2.5 Math 1.5B这个模型进行SFT以及RL训练。

利用vLLM进行模型推理

主要目的是熟悉vllm的部分接口,通过vllm这个原生推理加速框架来对llm进行快速的inference操作。同时为后面SFT训练的时候使用vLLM框架做推理测试提供一个可复用的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from transformers import AutoTokenizer, AutoModelForCausalLM
from vllm import LLM, SamplingParams
from typing import Callable, List
from cs336_alignment.drgrpo_grader import r1_zero_reward_fn
import json
import os

model_path = "/gz-data/models/Qwen2.5-Math-1.5B"
MATH_DATASET_PATH = "/root/yjx/assignment5-alignment/data/MATH/validation.jsonl"
PROMPT_TEMPLATE_PATH = "/root/yjx/assignment5-alignment/cs336_alignment/prompts/r1_zero.prompt"

def load_prompts(file_path: str,
template_path: str) -> List[str]:
'''
Load prompts from a JSONL file and return them as a list of strings.
'''
prompts = []
answers = []

with open(template_path, 'r') as f:
template = f.read()

with open(file_path, 'r') as f:
for line in f:
data = json.loads(line)
question = data['problem']
answer = data['answer']
prompt = template.replace("{question}", question)
prompts.append(prompt)
answers.append(answer)
return prompts, answers


def evaluate_vllm(
vllm_model: LLM,
reward_fn: Callable[[str, str], dict[str, float]],
prompts: List[str],
groundtruths: List[str],
eval_sampling_params: SamplingParams,
save_path: str = "/root/yjx/assignment5-alignment/cs336_alignment/zero_shot_results"
) -> None:
"""
Evaluate a language model on a list of prompts,
compute evaluation metrics, and serialize results to disk.
"""
count_all_correct = 0 # (1) format=1, answer=1
count_format_only = 0 # (2) format=1, answer=0
count_fail = 0 # (3) format=0, answer=0

format_fails = []
answer_fails = []

outputs = vllm_model.generate(prompts, sampling_params=eval_sampling_params)

if not os.path.exists(save_path):
os.makedirs(save_path)

with open(os.path.join(save_path, "results.jsonl"), 'w') as f:
for prompt, output, groundtruth in zip(prompts, outputs, groundtruths):
result = output.outputs[0].text
reward = reward_fn(result, groundtruth)
if reward["format_reward"] == 1 and reward["answer_reward"] == 1:
count_all_correct += 1
elif reward["format_reward"] == 1 and reward["answer_reward"] == 0:
count_format_only += 1
elif reward["format_reward"] == 0 and reward["answer_reward"] == 0:
count_fail += 1

# 方便来观察 10个 format=0 的例子和 10 个 answer=0 的例子
if reward["format_reward"] == 0 and len(format_fails) < 10:
format_fails.append({"res": result, "gt": groundtruth})
elif reward["format_reward"] == 1 and reward["answer_reward"] == 0 and len(answer_fails) < 10:
answer_fails.append({"res": result, "gt": groundtruth})

f.write(json.dumps({
"prompt": prompt,
"result": result,
"groundtruth": groundtruth,
"rewards": reward
}) + "\n")

# Compute and print evaluation metrics
total = len(prompts)
print(f"Total Prompts: {total}")
print(f"All Correct: {count_all_correct} ({count_all_correct / total:.2%})")
print(f"Format Only: {count_format_only} ({count_format_only / total:.2%})")
print(f"Fail: {count_fail} ({count_fail / total:.2%})")

# 存储bad case
with open(os.path.join(save_path, "format_fails.json"), 'w') as f:
json.dump(format_fails, f, indent=4)
with open(os.path.join(save_path, "answer_fails.json"), 'w') as f:
json.dump(answer_fails, f, indent=4)



if __name__ == "__main__":
prompts, answers = load_prompts(MATH_DATASET_PATH, PROMPT_TEMPLATE_PATH)
llm = LLM(model=model_path)
sampling_params = SamplingParams(temperature=1, top_p=1, max_tokens=1024, stop=["</answer>"], include_stop_str_in_output=True)
evaluate_vllm(llm, r1_zero_reward_fn, prompts, answers, sampling_params)

实现SFT微调Qwen 2.5 Math 1.5B

对于给定的输入 promptoutput 文本对,利用预训练的 tokenizer 分别对它们进行编码,然后将他们concat在一起作为训练的输入序列。

由于每个输入的长度不一,这里采用了朴素的找到最长的输入序列,将其余序列给padding到和它相同长度的方法进行填充。同时生成对应的mask,用来判别哪些是输入以及padding的token,哪些是模型最终输出的token。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def tokenize_prompt_and_output(
prompt_strs: list[str],
output_strs: list[str],
tokenizer: PreTrainedTokenizer,
) -> dict[str, torch.Tensor]:
'''
Tokenize the prompt and output strings, and construct a mask that is 1 for the response tokens and 0 for
other tokens (prompt or padding).
'''
pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0

prompt_encodings = [tokenizer.encode(p) for p in prompt_strs]
output_encodings = [tokenizer.encode(o) for o in output_strs]
full_encodings = []
full_mask = []
for p, o in zip(prompt_encodings, output_encodings):
full_encodings.append(p + o)
full_mask.append([0]*len(p) + [1]*len(o))

max_len = max(len(enc) for enc in full_encodings)
for enc, mask in zip(full_encodings, full_mask):
pad_len = max_len - len(enc)
enc.extend([pad_id] * pad_len)
mask.extend([0] * pad_len)

input_ids = torch.tensor([enc[:-1] for enc in full_encodings], dtype=torch.long) # B L
labels = torch.tensor([enc[1:] for enc in full_encodings], dtype=torch.long) # B L
response_mask = torch.tensor([mask[1:] for mask in full_mask], dtype=torch.long) # B L

return {
"input_ids": input_ids,
"labels": labels,
"response_mask": response_mask
}

接下来需要知道每一个token的条件概率。通过model的前向传播,我们可以得到每一个token的logits,然后通过log_softmax可以得到每一个token的条件概率。最后通过gather以及labels就可以得到每一个token的条件概率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def get_response_log_probs(
model: PreTrainedModel,
input_ids: torch.Tensor,
labels: torch.Tensor,
return_token_entropy: bool = False,
) -> dict[str, torch.Tensor]:
logits = model(input_ids).logits # B L V
log_probs = torch.nn.functional.log_softmax(logits, dim=-1) # B L V
labels_expanded = labels.unsqueeze(-1) # B L 1
log_probs_at_labels = torch.gather(log_probs, dim=-1, index=labels_expanded).squeeze(-1) # B L

if return_token_entropy:
token_entropy = compute_entropy(logits) # B L
return {
"log_probs": log_probs_at_labels,
"token_entropy": token_entropy
}
else:
return {
"log_probs": log_probs_at_labels
}

在得到对应token的条件概率之后,就可以来进行训练了。在这里得到的log_probs是整个序列的条件概率,因此需要使用之前处理的response_mask来过滤掉prompt和padding的token。由于大模型在微调的时候,通常是使用cross-entropy loss,因此这里计算的是负的log_probs。最后,为了得到每个batch的平均loss,需要除以batchsizegradient_accumulation_steps

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def masked_normalize(
tensor: torch.Tensor,
mask: torch.Tensor,
normalize_constant: float,
dim: int | None = None,
) -> torch.Tensor:
masked_tensor = tensor * mask
masked_sum = masked_tensor.sum(dim=dim)
normalized = masked_sum / normalize_constant
return normalized

def sft_microbatch_train_step(
policy_log_probs: torch.Tensor,
response_mask: torch.Tensor,
gradient_accumulation_steps: int,
normalize_constant: float = 1.0,
) -> tuple[torch.Tensor, dict[str, torch.Tensor]]:
loss_tensor = -policy_log_probs
batchsize = policy_log_probs.shape[0]
loss = masked_normalize(loss_tensor, response_mask, normalize_constant)
loss /= (batchsize*gradient_accumulation_steps)
loss.backward()

# 计算平均每个回答 Token 的交叉熵大小
masked_loss = loss_tensor * response_mask
token_loss = masked_loss.sum() / response_mask.sum().clamp(min=1)

metadata = {
"loss": token_loss.item(),
"num_tokens": response_mask.sum().item()
}
return loss, metadata

在得到了这些辅助函数之后就可以编写微调代码了,代码在sft_qwen_math.py

实验结果也是符合assignment的预期的。在使用全部的sft.jsonl数据进行微调的时候,在validation.jsonl上能够达到超过15%的准确率:

raw_sft_math

微调训练曲线以及测试集效果

同时按照assignment里面的要求,利用提供的r1 reward对正确的数据进行了过滤,利用过滤后的数据进行训练可以明显看出在验证集上的效果有提升:

filtered_sft

在过滤后数据上的训练和测试效果

专家迭代

专家迭代(Expert Iteration)是一种利用模型性能自举进行SFT的手段。

  1. 模型初始化
    将当前策略模型 $\pi_{\theta}$ 设定为初始模型 $\pi_{\theta_{\text{init}}}$。

  2. 迭代循环 (执行 n_ei_steps 次):

    • 采样问题:从总任务集 $\mathcal{D}$ 中随机抽取一批问题 $\mathcal{D}_b$。
    • 备份模型:将当前的策略模型记录为旧模型 $\pi_{\theta_{\text{old}}}$,作为本轮生成数据的基准。
    • 生成样本 (Sampling):针对 $\mathcal{D}b$ 中的每个问题 $q$,利用旧模型采样生成 $G$ 个不同的回答 ${o^{(i)}}{i=1}^G$。
    • 计算奖励 (Scoring):运行奖励函数 $R(q, o^{(i)})$。如果回答正确,则奖励 $r^{(i)} > 0$(通常为 1);如果错误,则为 0。
    • 构建微调数据集 (Filtering):过滤掉所有错误的回答(即 $r^{(i)} = 0$ 的样本),仅保留正确的问题-回答对,形成一个有监督微调数据集 $\mathcal{D}_{\text{sft}}$。
    • 策略更新 (SFT):使用 $\mathcal{D}{\text{sft}}$ 对当前模型 $\pi{\theta}$ 进行有监督微调(Supervised Fine-Tuning)。这一步是让模型向自己产生的“正确答案”学习。
  3. 输出
    迭代结束,输出优化后的策略模型 $\pi_{\theta}$。

通过专家迭代采样以及对应的奖励函数来生成用来自举的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def expert_iteration(policy_model: LLM, reward_fn: Callable, sampling_params: SamplingParams, prompts: List[str], groundtruths: List[str], ei_data_save_path: str = "/root/yjx/assignment5-alignment/data/MATH/"):
ei_data_save_path = os.path.join(ei_data_save_path, "ei_data.jsonl")
# old_policy_output = policy_model.generate(prompts, sampling_params=sampling_params)
old_policy_output = []
chunk_size = 64

with tqdm(total=len(prompts), desc="vLLM Generating") as pbar:
for i in range(0, len(prompts), chunk_size):
batch_prompts = prompts[i:i+chunk_size]
# 每次完成 64 个输入就滚动一次进度条
batch_outputs = policy_model.generate(batch_prompts, sampling_params=sampling_params, use_tqdm=False)
old_policy_output.extend(batch_outputs)
pbar.update(len(batch_prompts))

with open(ei_data_save_path, 'a') as f:
for req_output, gt in zip(old_policy_output, groundtruths):
unique_responses = set()
for res in req_output.outputs:
result = res.text
# 去重:过滤掉完全重复的文本,但保留“殊途同归”的不同推理路径 (Rationale Diversity)
if result in unique_responses:
continue
unique_responses.add(result)

reward = reward_fn(result, gt)
if reward["reward"] == 1:
json.dump({
"prompt": req_output.prompt,
"response": result,
"ground_truth": gt
}, f)
f.write('\n')

本质上就是使用模型自己生成的数据进行微调。通过这样的策略能够让模型从自我生成的数据中进行学习,从而提升模型的效果。微调的代码在ei_sft_qwen_math.py

训练曲线如下图所示,可以明显的看出,经过自举训练的模型效果显著的要高于仅仅用普通SFT的模型:

EI_sft

专家迭代模型的训练和测试效果

强化学习

这里使用了GRPO作为这个实验的RL训练方式。GRPO的优势在于他去掉了价值模型,即Critic模型,而是直接使用组内相对优势来更新策略模型。对于强化学习的原理在这里不进行赘述,在训练过程中需要注意的点在于,

GRPO_RL

GRPO RL模型的训练和测试效果