免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


在Colab中微调Qwen3-4B模型实战指南

发布日期:2025-09-20 18:01:39 浏览次数: 1570
作者:NothingCode

微信搜一搜,关注“NothingCode”

推荐语

手把手教你用Colab免费微调Qwen3-4B模型,轻松实现本地大模型部署!

核心内容:
1. 详细的环境配置与GPU资源检查步骤
2. 从下载模型到启动服务的完整流程
3. 数据集构建与模型验证的关键操作

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

一、配置环境

1. 安装 unsloth

pip install unsloth -i https://pypi.tuna.tsinghua.edu.cn/simple

2. 查看 GPU 信息

import torch
def print_cuda_info():    try:        print("-" * 40)        print("PyTorch CUDA Environment Information:")        print("-" * 40)        if torch.cuda.is_available():            device_count = torch.cuda.device_count()            print(f"Number of CUDA devices: {device_count}")            if device_count > 0:                device_name = torch.cuda.get_device_name(0)                print(f"0th CUDA Device Name: {device_name}")                total_memory = torch.cuda.get_device_properties(0).total_memory                allocated_memory = torch.cuda.memory_allocated(0)                free_memory = total_memory - allocated_memory                print(f"Total Memory: {total_memory / (1024 ** 3):.2f} GB")                print(f"Allocated Memory: {allocated_memory / (1024 ** 3):.2f} GB")                print(f"Free Memory: {free_memory / (1024 ** 3):.2f} GB")            else:                print("No CUDA devices found.")        else:            print("CUDA is not available.")        print("-" * 40)    except Exception as e:        print("-" * 40)        print(f"An error occurred: {e}")        print("-" * 40)
if __name__ == "__main__":    print_cuda_info()

3. 安装 Ollama

curl -fsSL https://ollama.com/install.sh | sh

4. 启动 ollama 服务

ollama serve &

5. 下载 Qwen3-4B 镜像

ollama pull qwen3:4B

(下载完成后需结束当前运行进程)

二、构建数据集

1. 加载模型和数据集(验证可用性)

from unsloth import FastLanguageModelfrom datasets import load_datasetimport torch
# 配置max_seq_length = 2048load_in_4bit = True  # 4bit量化
# 从 Hugging Face Hub 加载模型和分词器model, tokenizer = FastLanguageModel.from_pretrained(    "Qwen/Qwen3-4B-Instruct-2507",  # 模型名称    max_seq_length=max_seq_length,    load_in_4bit=load_in_4bit,    trust_remote_code=True  # Qwen模型需此参数)
# 配置 LoRA 适配器model = FastLanguageModel.get_peft_model(    model,    r=16,    target_modules=["q_proj","k_proj","v_proj","o_proj",                    "gate_proj","up_proj","down_proj",],    lora_alpha=16,    lora_dropout=0,    bias="none",    use_gradient_checkpointing="unsloth",    random_state=3407,    use_rslora=False,    loftq_config=None,)
EOS_TOKEN = tokenizer.eos_token
# 加载数据集(需修改为你的数据集路径)dataset = load_dataset("json", data_files="/content/noli.json", split="train")
# 查看数据集信息print("数据集样例:", dataset[0])print("数据集大小:", len(dataset))

三、模型微调

1. 完整微调代码

# 导入必要的库from unsloth import FastLanguageModelfrom datasets import load_dataset, Datasetfrom trl import SFTTrainerfrom transformers import TrainingArgumentsfrom unsloth import is_bfloat16_supportedimport torch
# --- 1. 模型和分词器加载 ---print("正在加载模型和分词器...")model, tokenizer = FastLanguageModel.from_pretrained(    model_name="Qwen/Qwen3-4B-Instruct-2507",    max_seq_length=2048,    load_in_4bit=True,    trust_remote_code=True  # Qwen 模型需要此参数)print("模型和分词器加载完成。")
# --- 2. LoRA 配置 ---print("正在配置LoRA适配器...")model = FastLanguageModel.get_peft_model(    model,    r=16,    target_modules=["q_proj", "k_proj", "v_proj", "o_proj",                    "gate_proj", "up_proj", "down_proj"],    lora_alpha=16,    lora_dropout=0,    bias="none",    use_gradient_checkpointing="unsloth",    random_state=3407,    use_rslora=False,    loftq_config=None,)print("LoRA适配器配置完成。")
# --- 3. 数据集加载 ---print("正在加载数据集...")raw_dataset = load_dataset("json", data_files="/content/NOLI.json", split="train")  # 修改为你的数据集路径print(f"原始数据集加载完成。数据集大小: {len(raw_dataset)}")print("原始数据集样例:", raw_dataset[0])
# --- 4. 预处理数据集:添加 'text' 列 ---def create_text_column(example):    """将单个样本格式化为模型训练所需的文本格式。"""    # 安全地获取字段,确保是字符串    instruction = str(example.get("instruction", "")).strip()    input_text = str(example.get("input", "")).strip()    output_text = str(example.get("output", "")).strip()
    # 构建用户部分    if input_text:        user_content = f"{instruction}\n{input_text}"    else:        user_content = instruction
    # 构建完整的提示(符合Qwen3对话格式)    full_prompt = (        f"<|im_start|>user\n{user_content}<|im_end|>\n"        f"<|im_start|>assistant\n{output_text}<|im_end|>"    )    return {"text": full_prompt}
print("正在预处理数据集,添加 'text' 列...")# 使用 map 函数为数据集中的每个样本添加 'text' 列dataset = raw_dataset.map(create_text_column)print("数据集预处理完成。")print("处理后数据集样例:", dataset[0])
# --- 5. 配置并创建 SFTTrainer ---print("正在配置SFTTrainer...")trainer = SFTTrainer(    model=model,    tokenizer=tokenizer,    train_dataset=dataset,  # 使用预处理后的数据集    dataset_text_field="text",  # 指定使用 'text' 列    max_seq_length=2048,    dataset_num_proc=2,    packing=False,  # 此格式下禁用packing    args=TrainingArguments(        per_device_train_batch_size=2,        gradient_accumulation_steps=4,        warmup_steps=5,        max_steps=100,  # 可根据需求调整训练步数        learning_rate=2e-4,        fp16=not is_bfloat16_supported(),        bf16=is_bfloat16_supported(),        logging_steps=5,        optim="adamw_8bit",        output_dir="./qwen_finetune_output_v2",        overwrite_output_dir=True,        report_to="none",  # 禁用外部日志记录        seed=3407,    ),)print("SFTTrainer配置完成。")
# --- 6. 开始训练 ---print("开始训练...")trainer.train()print("训练完成。")
# --- 7. 保存 LoRA 权重 ---print("正在保存LoRA适配器权重...")model.save_pretrained("./lora_adapters_v2")tokenizer.save_pretrained("./lora_adapters_v2")  # 同时保存分词器配置print("LoRA适配器已保存到 './lora_adapters_v2' 目录。")

2. 合并 LoRA 权重(保存为 32 位和 16 位模型)

# 导入必要的库from unsloth import FastLanguageModelimport torchfrom peft import PeftModel  # 用于加载和合并LoRA权重
# --- 1. 加载基础模型(非4bit量化,用于完整权重合并)---print("正在加载基础模型...")model, tokenizer = FastLanguageModel.from_pretrained(    model_name="Qwen/Qwen3-4B-Instruct-2507",    max_seq_length=2048,    load_in_4bit=False,  # 加载完整精度模型    trust_remote_code=True)print("基础模型加载完成。")
# --- 2. 加载 LoRA 适配器 ---print("正在加载LoRA适配器...")model = PeftModel.from_pretrained(    model=model,    model_id="/content/lora_adapters_v2"  # LoRA权重保存路径)print("LoRA适配器加载完成。")
# --- 3. 合并 LoRA 权重到基础模型 ---print("正在合并LoRA权重到基础模型...")model = model.merge_and_unload()  # 执行权重合并print("权重合并完成。")
# --- 4. 保存完整模型(32位和16位)---print("正在保存完整模型...")# 保存为32位完整模型(高精度,体积较大)model.save_pretrained("./qwen_merged_full_model")tokenizer.save_pretrained("./qwen_merged_full_model")
# 保存为16位模型(平衡精度与体积)model.save_pretrained("./qwen_merged_full_model_16bit", torch_dtype=torch.float16)tokenizer.save_pretrained("./qwen_merged_full_model_16bit")
print("完整模型保存完成!")print("32位模型保存路径:./qwen_merged_full_model")print("16位模型保存路径:./qwen_merged_full_model_16bit")

四、使用微调后的 16 位模型

from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamerimport torchfrom typing import List, Tuplefrom threading import Thread
# --- 1. 加载模型和分词器 ---model_path = "./qwen_merged_full_model_16bit"  # 16位模型路径print(f"正在加载模型: {model_path}...")tokenizer = AutoTokenizer.from_pretrained(    model_path,    trust_remote_code=True,    padding_side="left")# 确保pad_token存在(使用eos_token作为pad_token)if tokenizer.pad_token is None:    tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(    model_path,    torch_dtype=torch.float16,    device_map="auto",  # 自动分配设备(优先GPU)    trust_remote_code=True)model.eval()  # 推理模式print("模型加载完成,可开始对话。")
# --- 2. 自定义流式输出器(仅打印新生成内容)---class CurrentResponseStreamer(TextStreamer):    def __init__(self, tokenizer, input_prompt_length: int, skip_prompt: bool = True, **decode_kwargs):        super().__init__(tokenizer, skip_prompt=skip_prompt, **decode_kwargs)        self.input_prompt_length = input_prompt_length        self.first_token = True
    def on_finalized_text(self, text: str, stream_end: bool = False):        if self.first_token:            print("Noli: ", end="", flush=True)            self.first_token = False        print(text, end="", flush=True)        if stream_end:            print()  # 结束时换行
# --- 3. 流式生成函数 ---def generate_response_streaming(conversation_history: List[Tuple[str, str]]):    """根据对话历史生成流式响应,并返回完整响应文本"""    # 构建完整对话prompt    prompt = ""    for role, content in conversation_history:        if role == "user":            prompt += f"<|im_start|>user\n{content}<|im_end|>\n"        else:            prompt += f"<|im_start|>assistant\n{content}<|im_end|>\n"    prompt += f"<|im_start|>assistant\n"  # 启动助手回复生成
    # 计算输入prompt的token长度    input_ids = tokenizer(prompt, return_tensors="pt")["input_ids"]    input_prompt_length = input_ids.shape[1]
    # 移动输入到模型设备    inputs = {        "input_ids": input_ids.to(model.device),        "attention_mask": tokenizer(prompt, return_tensors="pt")["attention_mask"].to(model.device),    }
    # 初始化流式输出器    streamer = CurrentResponseStreamer(        tokenizer,        input_prompt_length=input_prompt_length,        skip_special_tokens=True    )
    # 启动流式生成(独立线程)    generation_kwargs = dict(        **inputs,        max_new_tokens=512,        temperature=0.7,        top_p=0.9,        repetition_penalty=1.1,        eos_token_id=tokenizer.eos_token_id,        pad_token_id=tokenizer.pad_token_id,        do_sample=True,        streamer=streamer    )    thread = Thread(target=model.generate, kwargs=generation_kwargs)    thread.start()    thread.join()  # 等待生成完成
    # 生成完整响应文本(用于更新对话历史)    with torch.no_grad():        outputs = model.generate(            **inputs,            max_new_tokens=512,            temperature=0.7,            top_p=0.9,            repetition_penalty=1.1,            eos_token_id=tokenizer.eos_token_id,            pad_token_id=tokenizer.pad_token_id,            do_sample=True        )    # 仅解码新生成的部分    generated_ids = outputs[:, inputs['input_ids'].shape[1]:]    full_response = tokenizer.decode(generated_ids[0], skip_special_tokens=True)    return full_response
# --- 4. 对话主程序 ---if __name__ == "__main__":    conversation_history: List[Tuple[str, str]] = []    print("你好!我是诺丽,一个AI助手。输入'退出'即可结束对话。")
    while True:        try:            user_input = input("\n你: ").strip()            if not user_input:                continue            if user_input.lower() in ["退出", "quit", "exit"]:                print("Noli: 再见!很高兴与你交谈。")                break
            # 更新对话历史            conversation_history.append(("user", user_input))            # 流式生成响应            current_response = generate_response_streaming(conversation_history)            # 保存完整响应到历史            conversation_history.append(("assistant", current_response))
        except KeyboardInterrupt:            print("\n\nNoli: 看起来你中断了对话。再见!")            break        except Exception as e:            print(f"\nNoli: 抱歉,处理你的请求时出现了错误: {e}")

五、常见问题解决

1. GPU 显存占满、资源无法释放

解决办法:重启 Colab 会话(菜单栏 -> Runtime -> Restart session)

六、扩展:保存 LoRA 权重到谷歌云盘

from google.colab import driveimport osimport shutil
# 1. 挂载 Google 云盘print("正在挂载 Google 云盘...")if not os.path.ismount('/content/drive'):    drive.mount('/content/drive')    print("Google 云盘已挂载。")else:    print("Google 云盘已挂载。")
# 2. 定义路径(可修改目标路径)source_dir_path = '/content/lora_adapters_v2'  # LoRA权重源路径destination_folder_path = '/content/drive/MyDrive/lora'  # 云盘目标文件夹destination_dir_path = os.path.join(destination_folder_path, os.path.basename(source_dir_path))
# 配置:是否覆盖已存在的目标目录OVERWRITE_EXISTING = True
# 3. 创建目标文件夹(若不存在)os.makedirs(destination_folder_path, exist_ok=True)print(f"目标备份文件夹: {destination_folder_path}")
# 4. 检查源目录有效性if not os.path.exists(source_dir_path) or not os.path.isdir(source_dir_path):    print(f"❌ 错误:源目录不存在或不是目录 - {source_dir_path}")else:    try:        # 计算源目录大小        source_size_mb = sum(            os.path.getsize(os.path.join(dirpath, filename))             for dirpath, dirnames, filenames in os.walk(source_dir_path)             for filename in filenames        ) / (1024 * 1024)        print(f"✅ 找到源目录: {source_dir_path} (估算大小: {source_size_mb:.2f} MB)")
        # 处理目标目录已存在的情况        if os.path.exists(destination_dir_path):            dest_size_mb = sum(                os.path.getsize(os.path.join(dirpath, filename))                 for dirpath, dirnames, filenames in os.walk(destination_dir_path)                 for filename in filenames            ) / (1024 * 1024)            print(f"⚠️  警告:目标目录已存在 - {destination_dir_path} (估算大小: {dest_size_mb:.2f} MB)")            if OVERWRITE_EXISTING:                print("正在删除旧目录以覆盖...")                shutil.rmtree(destination_dir_path)                print("旧目录已删除。")            else:                print("OVERWRITE_EXISTING=False,跳过复制。")                print("如需覆盖,请将 OVERWRITE_EXISTING 设为 True。")
        # 复制目录到云盘        if OVERWRITE_EXISTING or not os.path.exists(destination_dir_path):            print(f"正在复制目录到云盘...")            print(f"  源: {source_dir_path}")            print(f"  目标: {destination_dir_path}")            shutil.copytree(source_dir_path, destination_dir_path)            print(f"✅ 目录已备份到云盘: {destination_dir_path}")        else:            print("操作已取消或跳过。")
    except Exception as e:        print(f"❌ 复制错误: {e}")

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询