微信扫码
添加专属顾问
我要投稿
探索AI理解人类语言的第一步:文本预处理如何为机器学习铺路。核心内容: 1. 文本预处理的基本步骤与核心作用 2. 以《时间机器》为例的文本加载与清洗实战 3. 分词策略选择对后续处理的关键影响
在自然语言处理中,如何处理文本数据是首先要考虑的问题。一般的文本数据预处理步骤通常包括:
以H.G.Well编写的《time machine》科幻小说作为文本输入数据。 该小说大概有30000多个单词。 将数据集读取到由多条文本行组成的列表中,其中每条文本行都是一个字符串。 为简单起见,忽略标点符号和字母大写。
#coding=utf-8
import collections
import re
from d2l import torch as d2l
def read_time_machine():#@save
# 《time machine》1895年发表的一部科幻小说
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
'''
re.sub(pattern, repl, string, count=0, flags=0)
@ pattern:正则表达式模式字符串或预编译对象
@ repl:替换内容(字符串/函数,函数需返回字符串)
@ string:原始待处理字符串
@ count:最大替换次数(默认0表示全部替换)
@ flags:正则标志如 re.IGNORECASE
'''
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
lines = read_time_machine()
print(f'文本总行数: {len(lines)}')
print(lines[0])
输出:
Downloading ../data/timemachine.txt from http://d2l-data.s3-accelerate.amazonaws.com/timemachine.txt...
文本总行数: 3221
the time machine by h g wells
将文本中的句子,进行分词处理,拆分成单词或字母。可选择通过以单词形式(word)进行分词,也可以选择以字母形式(char)进行分词。
def tokenize(lines, token='word'): #@save
"""将文本行拆分为单词或单个字符"""
if token == 'word':
return [line.split() for line in lines]
elif token == 'char':
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
tokens = tokenize(lines)
for i in range(3):
print(tokens[i])
输出:
['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
[]
[]
分词后的类型是字符串,而模型需要的输入是数字,因此这种类型不方便模型使用。 现在,让构建一个字典,通常也叫做词表(vocabulary), 用来将字符串类型的词元映射到从(0)开始的数字索引中。 先将训练集中的所有文档合并在一起,对它们的唯一词元进行统计, 得到的统计结果称之为语料(corpus)。 然后根据每个唯一词元的出现频率,为其分配一个数字索引。 很少出现的词元通常被移除,这可以降低复杂性。 另外,语料库中不存在或已删除的任何词元都将映射到一个特定的未知词元“
# 词表化函数
class Vocab:
def __init__(self, tokens = None, min_freq = 0, reserved_tokens = None):
if tokens isNone:
tokens = []
if reserved_tokens isNone:
reserved_tokens = []
# 统计每个词出现的频率
counter = count_corpus(tokens)
# 按出现频率排序
# counter.items():将字典形式的计数器转换为(键, 值)元组列表
# counter = {'a':3, 'b':1, 'c':2},则转换为 [('a',3), ('b',1), ('c',2)]
# key=lambda x: x[1]:指定排序依据为元组的第二个元素
# reverse=True:设置为降序排列(从大到小)
self._token_freqs = sorted(counter.items(), key = lambda x: x[1], reverse= True)
# 未知词元的索引为0
'''
reserved_tokens = [0]
idx_to_token = ['<unk>'] + reserved_tokens
token_to_idx = {token: idx
for idx, token in enumerate(idx_to_token)}
输出:
['<unk>', 0]
{'<unk>': 0, 0: 1}
'''
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
# 构建 idx_to_token数组和token_to_idx 字典
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token notin self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
# 返回 token个数量
def __len__(self):
return len(self.idx_to_token)
# 给定多个token(list,tuple类型的)个,返回它们的indices
def __getitem__(self, tokens):
ifnot isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
# 给定多个返回它们的indices(list,tuple类型的)个,返回它们的token
def to_tokens(self, indices):
ifnot isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
@property
def unk(self):# 未知词元的索引为0
return0
@property
def token_freqs(self):
return self._token_freqs
# 统计每个词出现的频率 Counter({'the': 2261, 'i': 1267, 'and': 1245, ....})
def count_corpus(tokens):
""" 统计词元的频率"""
if len(tokens) == 0or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:9])
输出:
[('<unk>', 0), ('the', 1), ('i', 2), ('and', 3), ('of', 4), ('a', 5), ('to', 6), ('was', 7), ('in', 8)]
将每一条文本行转换成一个数字索引列表。
for i in [0, 10]:
print('文本:', tokens[i])
print('索引:', vocab[tokens[i]])
输出:
文本: ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
索引: [1, 19, 50, 40, 2183, 2184, 400]
文本: ['twinkled', 'and', 'his', 'usually', 'pale', 'face', 'was', 'flushed', 'and', 'animated', 'the']
索引: [2186, 3, 25, 1044, 362, 113, 7, 1421, 3, 1045, 1]
将各个函数进行整合:
#coding=utf-8
import collections
import re
from d2l import torch as d2l
def read_time_machine():#@save
# 《time machine》1895年发表的一部科幻小说
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
'''
re.sub(pattern, repl, string, count=0, flags=0)
@ pattern:正则表达式模式字符串或预编译对象
@ repl:替换内容(字符串/函数,函数需返回字符串)
@ string:原始待处理字符串
@ count:最大替换次数(默认0表示全部替换)
@ flags:正则标志如 re.IGNORECASE
'''
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
def tokenize(lines, token='word'):#@save
"""将文本行拆分为单词或单个字符"""
if token == 'word':
return [line.split() for line in lines]
elif token == 'char':
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)
# 词表化函数
class Vocab:
def __init__(self, tokens = None, min_freq = 0, reserved_tokens = None):
if tokens isNone:
tokens = []
if reserved_tokens isNone:
reserved_tokens = []
# 统计每个词出现的频率
counter = self.count_corpus(tokens)
# 按出现频率排序
# counter.items():将字典形式的计数器转换为(键, 值)元组列表
# counter = {'a':3, 'b':1, 'c':2},则转换为 [('a',3), ('b',1), ('c',2)]
# key=lambda x: x[1]:指定排序依据为元组的第二个元素
# reverse=True:设置为降序排列(从大到小)
self._token_freqs = sorted(counter.items(), key = lambda x: x[1], reverse= True)
# 未知词元的索引为0
'''
reserved_tokens = [0]
idx_to_token = ['<unk>'] + reserved_tokens
token_to_idx = {token: idx
for idx, token in enumerate(idx_to_token)}
输出:
['<unk>', 0]
{'<unk>': 0, 0: 1}
'''
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
# 构建 idx_to_token数组和token_to_idx 字典
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token notin self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
# 返回 token个数量
def __len__(self):
return len(self.idx_to_token)
# 给定多个token(list,tuple类型的)个,返回它们的indices
def __getitem__(self, tokens):
ifnot isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
# 给定多个返回它们的indices(list,tuple类型的)个,返回它们的token
def to_tokens(self, indices):
ifnot isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
@property
def unk(self):# 未知词元的索引为0
return0
@property
def token_freqs(self):
return self._token_freqs
# 统计每个词出现的频率 Counter({'the': 2261, 'i': 1267, 'and': 1245, ....})
def count_corpus(self, tokens):
""" 统计词元的频率"""
if len(tokens) == 0or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)
def load_corpus_time_machine(max_tokens = -1):
"""返回数据集的标记索引列表和词汇表"""
lines = read_time_machine()
tokens = tokenize(lines, 'word')
vocab = Vocab(tokens)
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
if __name__ == '__main__':
corpus, vocab = load_corpus_time_machine()
print(f'按词划分, 单词的数量{len(corpus)}')
print(f'按词划分, 词表的数量{len(vocab)}')
输出:
按词划分, 单词的数量32775
按词划分, 词表的数量4580
Process finished with exit code 0
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-07-04
Karpathy:我不是要造新词,是「上下文工程」对 Agent 来说太重要了
2025-07-04
AI Agent的核心:Context Engineering(上下文工程)
2025-07-04
AI Agent与AI Workflow:“对决”与“共生”,未来属于“混血儿”!
2025-07-04
破局AI内卷:揭秘驱动10倍效能的AI工作流三大核心技术支柱
2025-07-04
深度揭秘:下一代AI生产力,颠覆你的工作与认知?99%的人还没看懂!
2025-07-04
AI Agent时代的AI Workflow,重构未来工作流设计准则!
2025-07-04
MCP对AI Agent意味什么?深度解剖MCP的本质与未来影响力
2025-07-04
让你的 AI Agent 拥有“永不遗忘”的超能力:LangGraph 与 PostgreSQL 实现长期记忆的深度实践
2025-05-29
2025-04-11
2025-04-12
2025-04-06
2025-04-29
2025-04-12
2025-04-29
2025-05-07
2025-05-23
2025-05-07