微信扫码
添加专属顾问
我要投稿
手把手教你用Colab免费微调Qwen3-4B模型,轻松实现本地大模型部署! 核心内容: 1. 详细的环境配置与GPU资源检查步骤 2. 从下载模型到启动服务的完整流程 3. 数据集构建与模型验证的关键操作
pip install unsloth -i https://pypi.tuna.tsinghua.edu.cn/simple
import torch
def print_cuda_info():
try:
print("-" * 40)
print("PyTorch CUDA Environment Information:")
print("-" * 40)
if torch.cuda.is_available():
device_count = torch.cuda.device_count()
print(f"Number of CUDA devices: {device_count}")
if device_count > 0:
device_name = torch.cuda.get_device_name(0)
print(f"0th CUDA Device Name: {device_name}")
total_memory = torch.cuda.get_device_properties(0).total_memory
allocated_memory = torch.cuda.memory_allocated(0)
free_memory = total_memory - allocated_memory
print(f"Total Memory: {total_memory / (1024 ** 3):.2f} GB")
print(f"Allocated Memory: {allocated_memory / (1024 ** 3):.2f} GB")
print(f"Free Memory: {free_memory / (1024 ** 3):.2f} GB")
else:
print("No CUDA devices found.")
else:
print("CUDA is not available.")
print("-" * 40)
except Exception as e:
print("-" * 40)
print(f"An error occurred: {e}")
print("-" * 40)
if __name__ == "__main__":
print_cuda_info()
curl -fsSL https://ollama.com/install.sh | sh
ollama serve &
ollama pull qwen3:4B
(下载完成后需结束当前运行进程)
from unsloth import FastLanguageModel
from datasets import load_dataset
import torch
# 配置
max_seq_length = 2048
load_in_4bit = True # 4bit量化
# 从 Hugging Face Hub 加载模型和分词器
model, tokenizer = FastLanguageModel.from_pretrained(
"Qwen/Qwen3-4B-Instruct-2507", # 模型名称
max_seq_length=max_seq_length,
load_in_4bit=load_in_4bit,
trust_remote_code=True # Qwen模型需此参数
)
# 配置 LoRA 适配器
model = FastLanguageModel.get_peft_model(
model,
r=16,
target_modules=["q_proj","k_proj","v_proj","o_proj",
"gate_proj","up_proj","down_proj",],
lora_alpha=16,
lora_dropout=0,
bias="none",
use_gradient_checkpointing="unsloth",
random_state=3407,
use_rslora=False,
loftq_config=None,
)
EOS_TOKEN = tokenizer.eos_token
# 加载数据集(需修改为你的数据集路径)
dataset = load_dataset("json", data_files="/content/noli.json", split="train")
# 查看数据集信息
print("数据集样例:", dataset[0])
print("数据集大小:", len(dataset))
# 导入必要的库
from unsloth import FastLanguageModel
from datasets import load_dataset, Dataset
from trl import SFTTrainer
from transformers import TrainingArguments
from unsloth import is_bfloat16_supported
import torch
# --- 1. 模型和分词器加载 ---
print("正在加载模型和分词器...")
model, tokenizer = FastLanguageModel.from_pretrained(
model_name="Qwen/Qwen3-4B-Instruct-2507",
max_seq_length=2048,
load_in_4bit=True,
trust_remote_code=True # Qwen 模型需要此参数
)
print("模型和分词器加载完成。")
# --- 2. LoRA 配置 ---
print("正在配置LoRA适配器...")
model = FastLanguageModel.get_peft_model(
model,
r=16,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj"],
lora_alpha=16,
lora_dropout=0,
bias="none",
use_gradient_checkpointing="unsloth",
random_state=3407,
use_rslora=False,
loftq_config=None,
)
print("LoRA适配器配置完成。")
# --- 3. 数据集加载 ---
print("正在加载数据集...")
raw_dataset = load_dataset("json", data_files="/content/NOLI.json", split="train") # 修改为你的数据集路径
print(f"原始数据集加载完成。数据集大小: {len(raw_dataset)}")
print("原始数据集样例:", raw_dataset[0])
# --- 4. 预处理数据集:添加 'text' 列 ---
def create_text_column(example):
"""将单个样本格式化为模型训练所需的文本格式。"""
# 安全地获取字段,确保是字符串
instruction = str(example.get("instruction", "")).strip()
input_text = str(example.get("input", "")).strip()
output_text = str(example.get("output", "")).strip()
# 构建用户部分
if input_text:
user_content = f"{instruction}\n{input_text}"
else:
user_content = instruction
# 构建完整的提示(符合Qwen3对话格式)
full_prompt = (
f"<|im_start|>user\n{user_content}<|im_end|>\n"
f"<|im_start|>assistant\n{output_text}<|im_end|>"
)
return {"text": full_prompt}
print("正在预处理数据集,添加 'text' 列...")
# 使用 map 函数为数据集中的每个样本添加 'text' 列
dataset = raw_dataset.map(create_text_column)
print("数据集预处理完成。")
print("处理后数据集样例:", dataset[0])
# --- 5. 配置并创建 SFTTrainer ---
print("正在配置SFTTrainer...")
trainer = SFTTrainer(
model=model,
tokenizer=tokenizer,
train_dataset=dataset, # 使用预处理后的数据集
dataset_text_field="text", # 指定使用 'text' 列
max_seq_length=2048,
dataset_num_proc=2,
packing=False, # 此格式下禁用packing
args=TrainingArguments(
per_device_train_batch_size=2,
gradient_accumulation_steps=4,
warmup_steps=5,
max_steps=100, # 可根据需求调整训练步数
learning_rate=2e-4,
fp16=not is_bfloat16_supported(),
bf16=is_bfloat16_supported(),
logging_steps=5,
optim="adamw_8bit",
output_dir="./qwen_finetune_output_v2",
overwrite_output_dir=True,
report_to="none", # 禁用外部日志记录
seed=3407,
),
)
print("SFTTrainer配置完成。")
# --- 6. 开始训练 ---
print("开始训练...")
trainer.train()
print("训练完成。")
# --- 7. 保存 LoRA 权重 ---
print("正在保存LoRA适配器权重...")
model.save_pretrained("./lora_adapters_v2")
tokenizer.save_pretrained("./lora_adapters_v2") # 同时保存分词器配置
print("LoRA适配器已保存到 './lora_adapters_v2' 目录。")
# 导入必要的库
from unsloth import FastLanguageModel
import torch
from peft import PeftModel # 用于加载和合并LoRA权重
# --- 1. 加载基础模型(非4bit量化,用于完整权重合并)---
print("正在加载基础模型...")
model, tokenizer = FastLanguageModel.from_pretrained(
model_name="Qwen/Qwen3-4B-Instruct-2507",
max_seq_length=2048,
load_in_4bit=False, # 加载完整精度模型
trust_remote_code=True
)
print("基础模型加载完成。")
# --- 2. 加载 LoRA 适配器 ---
print("正在加载LoRA适配器...")
model = PeftModel.from_pretrained(
model=model,
model_id="/content/lora_adapters_v2" # LoRA权重保存路径
)
print("LoRA适配器加载完成。")
# --- 3. 合并 LoRA 权重到基础模型 ---
print("正在合并LoRA权重到基础模型...")
model = model.merge_and_unload() # 执行权重合并
print("权重合并完成。")
# --- 4. 保存完整模型(32位和16位)---
print("正在保存完整模型...")
# 保存为32位完整模型(高精度,体积较大)
model.save_pretrained("./qwen_merged_full_model")
tokenizer.save_pretrained("./qwen_merged_full_model")
# 保存为16位模型(平衡精度与体积)
model.save_pretrained("./qwen_merged_full_model_16bit", torch_dtype=torch.float16)
tokenizer.save_pretrained("./qwen_merged_full_model_16bit")
print("完整模型保存完成!")
print("32位模型保存路径:./qwen_merged_full_model")
print("16位模型保存路径:./qwen_merged_full_model_16bit")
from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer
import torch
from typing import List, Tuple
from threading import Thread
# --- 1. 加载模型和分词器 ---
model_path = "./qwen_merged_full_model_16bit" # 16位模型路径
print(f"正在加载模型: {model_path}...")
tokenizer = AutoTokenizer.from_pretrained(
model_path,
trust_remote_code=True,
padding_side="left"
)
# 确保pad_token存在(使用eos_token作为pad_token)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16,
device_map="auto", # 自动分配设备(优先GPU)
trust_remote_code=True
)
model.eval() # 推理模式
print("模型加载完成,可开始对话。")
# --- 2. 自定义流式输出器(仅打印新生成内容)---
class CurrentResponseStreamer(TextStreamer):
def __init__(self, tokenizer, input_prompt_length: int, skip_prompt: bool = True, **decode_kwargs):
super().__init__(tokenizer, skip_prompt=skip_prompt, **decode_kwargs)
self.input_prompt_length = input_prompt_length
self.first_token = True
def on_finalized_text(self, text: str, stream_end: bool = False):
if self.first_token:
print("Noli: ", end="", flush=True)
self.first_token = False
print(text, end="", flush=True)
if stream_end:
print() # 结束时换行
# --- 3. 流式生成函数 ---
def generate_response_streaming(conversation_history: List[Tuple[str, str]]):
"""根据对话历史生成流式响应,并返回完整响应文本"""
# 构建完整对话prompt
prompt = ""
for role, content in conversation_history:
if role == "user":
prompt += f"<|im_start|>user\n{content}<|im_end|>\n"
else:
prompt += f"<|im_start|>assistant\n{content}<|im_end|>\n"
prompt += f"<|im_start|>assistant\n" # 启动助手回复生成
# 计算输入prompt的token长度
input_ids = tokenizer(prompt, return_tensors="pt")["input_ids"]
input_prompt_length = input_ids.shape[1]
# 移动输入到模型设备
inputs = {
"input_ids": input_ids.to(model.device),
"attention_mask": tokenizer(prompt, return_tensors="pt")["attention_mask"].to(model.device),
}
# 初始化流式输出器
streamer = CurrentResponseStreamer(
tokenizer,
input_prompt_length=input_prompt_length,
skip_special_tokens=True
)
# 启动流式生成(独立线程)
generation_kwargs = dict(
**inputs,
max_new_tokens=512,
temperature=0.7,
top_p=0.9,
repetition_penalty=1.1,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id,
do_sample=True,
streamer=streamer
)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
thread.join() # 等待生成完成
# 生成完整响应文本(用于更新对话历史)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=512,
temperature=0.7,
top_p=0.9,
repetition_penalty=1.1,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id,
do_sample=True
)
# 仅解码新生成的部分
generated_ids = outputs[:, inputs['input_ids'].shape[1]:]
full_response = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
return full_response
# --- 4. 对话主程序 ---
if __name__ == "__main__":
conversation_history: List[Tuple[str, str]] = []
print("你好!我是诺丽,一个AI助手。输入'退出'即可结束对话。")
while True:
try:
user_input = input("\n你: ").strip()
if not user_input:
continue
if user_input.lower() in ["退出", "quit", "exit"]:
print("Noli: 再见!很高兴与你交谈。")
break
# 更新对话历史
conversation_history.append(("user", user_input))
# 流式生成响应
current_response = generate_response_streaming(conversation_history)
# 保存完整响应到历史
conversation_history.append(("assistant", current_response))
except KeyboardInterrupt:
print("\n\nNoli: 看起来你中断了对话。再见!")
break
except Exception as e:
print(f"\nNoli: 抱歉,处理你的请求时出现了错误: {e}")
解决办法:重启 Colab 会话(菜单栏 -> Runtime -> Restart session)
from google.colab import drive
import os
import shutil
# 1. 挂载 Google 云盘
print("正在挂载 Google 云盘...")
if not os.path.ismount('/content/drive'):
drive.mount('/content/drive')
print("Google 云盘已挂载。")
else:
print("Google 云盘已挂载。")
# 2. 定义路径(可修改目标路径)
source_dir_path = '/content/lora_adapters_v2' # LoRA权重源路径
destination_folder_path = '/content/drive/MyDrive/lora' # 云盘目标文件夹
destination_dir_path = os.path.join(destination_folder_path, os.path.basename(source_dir_path))
# 配置:是否覆盖已存在的目标目录
OVERWRITE_EXISTING = True
# 3. 创建目标文件夹(若不存在)
os.makedirs(destination_folder_path, exist_ok=True)
print(f"目标备份文件夹: {destination_folder_path}")
# 4. 检查源目录有效性
if not os.path.exists(source_dir_path) or not os.path.isdir(source_dir_path):
print(f"❌ 错误:源目录不存在或不是目录 - {source_dir_path}")
else:
try:
# 计算源目录大小
source_size_mb = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(source_dir_path)
for filename in filenames
) / (1024 * 1024)
print(f"✅ 找到源目录: {source_dir_path} (估算大小: {source_size_mb:.2f} MB)")
# 处理目标目录已存在的情况
if os.path.exists(destination_dir_path):
dest_size_mb = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(destination_dir_path)
for filename in filenames
) / (1024 * 1024)
print(f"⚠️ 警告:目标目录已存在 - {destination_dir_path} (估算大小: {dest_size_mb:.2f} MB)")
if OVERWRITE_EXISTING:
print("正在删除旧目录以覆盖...")
shutil.rmtree(destination_dir_path)
print("旧目录已删除。")
else:
print("OVERWRITE_EXISTING=False,跳过复制。")
print("如需覆盖,请将 OVERWRITE_EXISTING 设为 True。")
# 复制目录到云盘
if OVERWRITE_EXISTING or not os.path.exists(destination_dir_path):
print(f"正在复制目录到云盘...")
print(f" 源: {source_dir_path}")
print(f" 目标: {destination_dir_path}")
shutil.copytree(source_dir_path, destination_dir_path)
print(f"✅ 目录已备份到云盘: {destination_dir_path}")
else:
print("操作已取消或跳过。")
except Exception as e:
print(f"❌ 复制错误: {e}")
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-07-03
2025-06-26
2025-07-28
2025-07-22
2025-07-09
2025-07-09
2025-07-31
2025-07-30
2025-07-20
2025-08-07