2025-03-08 01:34:36 +08:00

248 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
中文分词模块:负责中文文本分词处理
"""
import os
import jieba
import re
from typing import List, Dict, Tuple, Optional, Any, Set, Union
import pandas as pd
from collections import Counter
from config.system_config import STOPWORDS_DIR, ENCODING
from utils.logger import get_logger
from utils.file_utils import read_text_file, write_text_file, ensure_dir
logger = get_logger("Tokenization")
class ChineseTokenizer:
"""中文分词器基于jieba实现"""
def __init__(self, user_dict_path: Optional[str] = None,
use_hmm: bool = True,
remove_stopwords: bool = True,
stopwords_path: Optional[str] = None,
add_custom_words: Optional[List[str]] = None):
"""
初始化中文分词器
Args:
user_dict_path: 用户自定义词典路径
use_hmm: 是否使用HMM模型进行分词
remove_stopwords: 是否移除停用词
stopwords_path: 停用词表路径如果为None则使用默认停用词表
add_custom_words: 要添加的自定义词语列表
"""
self.use_hmm = use_hmm
self.remove_stopwords = remove_stopwords
# 加载用户自定义词典
if user_dict_path and os.path.exists(user_dict_path):
jieba.load_userdict(user_dict_path)
logger.info(f"已加载用户自定义词典:{user_dict_path}")
# 加载停用词
self.stopwords = set()
if remove_stopwords:
self._load_stopwords(stopwords_path)
# 添加自定义词语
if add_custom_words:
for word in add_custom_words:
jieba.add_word(word)
logger.info(f"已添加 {len(add_custom_words)} 个自定义词语")
def _load_stopwords(self, stopwords_path: Optional[str] = None) -> None:
"""
加载停用词
Args:
stopwords_path: 停用词表路径如果为None则使用默认停用词表
"""
# 如果没有指定停用词表路径,则使用默认停用词表
if not stopwords_path:
stopwords_path = os.path.join(STOPWORDS_DIR, "chinese_stopwords.txt")
# 如果没有找到默认停用词表,则创建一个空的停用词表
if not os.path.exists(stopwords_path):
ensure_dir(os.path.dirname(stopwords_path))
# 常见中文停用词
default_stopwords = [
"", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", ""
]
write_text_file("\n".join(default_stopwords), stopwords_path)
logger.info(f"未找到停用词表,已创建默认停用词表:{stopwords_path}")
# 加载停用词表
try:
with open(stopwords_path, "r", encoding=ENCODING) as f:
for line in f:
word = line.strip()
if word:
self.stopwords.add(word)
logger.info(f"已加载 {len(self.stopwords)} 个停用词")
except Exception as e:
logger.error(f"加载停用词表失败:{e}")
def add_stopwords(self, words: Union[str, List[str]]) -> None:
"""
添加停用词
Args:
words: 要添加的停用词(字符串或列表)
"""
if isinstance(words, str):
self.stopwords.add(words.strip())
else:
for word in words:
self.stopwords.add(word.strip())
def remove_stopwords_from_list(self, words: List[str]) -> List[str]:
"""
从词语列表中移除停用词
Args:
words: 词语列表
Returns:
移除停用词后的词语列表
"""
if not self.remove_stopwords:
return words
return [word for word in words if word not in self.stopwords]
def tokenize(self, text: str, return_string: bool = False,
cut_all: bool = False) -> Union[List[str], str]:
"""
对文本进行分词
Args:
text: 要分词的文本
return_string: 是否返回字符串(以空格分隔的词语)
cut_all: 是否使用全模式(默认使用精确模式)
Returns:
分词结果(词语列表或字符串)
"""
if not text:
return "" if return_string else []
# 使用jieba进行分词
if cut_all:
words = jieba.lcut(text, cut_all=True)
else:
words = jieba.lcut(text, HMM=self.use_hmm)
# 移除停用词
if self.remove_stopwords:
words = self.remove_stopwords_from_list(words)
# 返回结果
if return_string:
return " ".join(words)
else:
return words
def batch_tokenize(self, texts: List[str], return_string: bool = False,
cut_all: bool = False) -> List[Union[List[str], str]]:
"""
批量分词
Args:
texts: 要分词的文本列表
return_string: 是否返回字符串(以空格分隔的词语)
cut_all: 是否使用全模式(默认使用精确模式)
Returns:
分词结果列表
"""
return [self.tokenize(text, return_string, cut_all) for text in texts]
def analyze_tokens(self, texts: List[str], top_n: int = 20) -> Dict[str, Any]:
"""
分析文本中的词频分布
Args:
texts: 要分析的文本列表
top_n: 返回前多少个高频词
Returns:
包含词频分析结果的字典
"""
all_tokens = []
for text in texts:
tokens = self.tokenize(text, return_string=False)
all_tokens.extend(tokens)
# 统计词频
token_counter = Counter(all_tokens)
# 获取最常见的词
most_common = token_counter.most_common(top_n)
# 计算唯一词数量
unique_tokens = len(token_counter)
return {
"total_tokens": len(all_tokens),
"unique_tokens": unique_tokens,
"most_common": most_common,
"token_counter": token_counter
}
def get_top_keywords(self, texts: List[str], top_n: int = 20,
min_freq: int = 3, min_length: int = 2) -> List[Tuple[str, int]]:
"""
获取文本中的关键词
Args:
texts: 要分析的文本列表
top_n: 返回前多少个关键词
min_freq: 最小词频
min_length: 最小词长度(字符数)
Returns:
包含(关键词, 词频)的元组列表
"""
tokens_analysis = self.analyze_tokens(texts)
token_counter = tokens_analysis["token_counter"]
# 过滤满足条件的词
filtered_keywords = [(word, count) for word, count in token_counter.items()
if count >= min_freq and len(word) >= min_length]
# 按词频排序
sorted_keywords = sorted(filtered_keywords, key=lambda x: x[1], reverse=True)
return sorted_keywords[:top_n]
def get_vocabulary(self, texts: List[str], min_freq: int = 1) -> List[str]:
"""
获取词汇表
Args:
texts: 文本列表
min_freq: 最小词频
Returns:
词汇表(词语列表)
"""
tokens_analysis = self.analyze_tokens(texts)
token_counter = tokens_analysis["token_counter"]
# 过滤满足最小词频的词
vocabulary = [word for word, count in token_counter.items() if count >= min_freq]
return vocabulary
def get_stopwords(self) -> Set[str]:
"""
获取停用词集合
Returns:
停用词集合
"""
return self.stopwords.copy()