2025-03-08 01:34:36 +08:00

430 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
特征提取模块:实现文本特征提取,包括语法特征、语义特征等
"""
import re
import numpy as np
from typing import List, Dict, Tuple, Optional, Any, Union, Set
from collections import Counter
import jieba.posseg as pseg
from config.system_config import CATEGORIES
from utils.logger import get_logger
from preprocessing.tokenization import ChineseTokenizer
logger = get_logger("FeatureExtraction")
class FeatureExtractor:
"""特征提取基类,定义通用接口"""
def __init__(self):
"""初始化特征提取器"""
pass
def extract(self, text: str) -> Dict[str, Any]:
"""
从文本中提取特征
Args:
text: 文本
Returns:
特征字典
"""
raise NotImplementedError("子类必须实现此方法")
def batch_extract(self, texts: List[str]) -> List[Dict[str, Any]]:
"""
批量提取特征
Args:
texts: 文本列表
Returns:
特征字典列表
"""
return [self.extract(text) for text in texts]
def extract_as_vector(self, text: str) -> np.ndarray:
"""
从文本中提取特征,并转换为向量表示
Args:
text: 文本
Returns:
特征向量
"""
raise NotImplementedError("子类必须实现此方法")
def batch_extract_as_vector(self, texts: List[str]) -> np.ndarray:
"""
批量提取特征,并转换为向量表示
Args:
texts: 文本列表
Returns:
特征向量数组
"""
return np.array([self.extract_as_vector(text) for text in texts])
class StatisticalFeatureExtractor(FeatureExtractor):
"""统计特征提取器,提取文本的统计特征"""
def __init__(self, tokenizer: Optional[ChineseTokenizer] = None):
"""
初始化统计特征提取器
Args:
tokenizer: 分词器如果为None则创建一个新的分词器
"""
super().__init__()
self.tokenizer = tokenizer or ChineseTokenizer()
def extract(self, text: str) -> Dict[str, Any]:
"""
从文本中提取统计特征
Args:
text: 文本
Returns:
特征字典,包含各种统计特征
"""
if not text:
return {
"char_count": 0,
"word_count": 0,
"sentence_count": 0,
"avg_word_length": 0,
"avg_sentence_length": 0,
"contains_number": False,
"contains_english": False,
"punctuation_ratio": 0,
"top_words": []
}
# 字符数
char_count = len(text)
# 分词
words = self.tokenizer.tokenize(text, return_string=False)
word_count = len(words)
# 句子数(按标点符号分割)
sentences = re.split(r'[。!?!?]+', text)
sentences = [s for s in sentences if s.strip()]
sentence_count = len(sentences)
# 平均词长
avg_word_length = sum(len(word) for word in words) / word_count if word_count > 0 else 0
# 平均句长(以字符为单位)
avg_sentence_length = char_count / sentence_count if sentence_count > 0 else 0
# 是否包含数字
contains_number = bool(re.search(r'\d', text))
# 是否包含英文
contains_english = bool(re.search(r'[a-zA-Z]', text))
# 标点符号比例
punctuation_pattern = re.compile(r'[^\w\s]')
punctuations = punctuation_pattern.findall(text)
punctuation_ratio = len(punctuations) / char_count if char_count > 0 else 0
# 高频词
word_counter = Counter(words)
top_words = word_counter.most_common(5)
return {
"char_count": char_count,
"word_count": word_count,
"sentence_count": sentence_count,
"avg_word_length": avg_word_length,
"avg_sentence_length": avg_sentence_length,
"contains_number": contains_number,
"contains_english": contains_english,
"punctuation_ratio": punctuation_ratio,
"top_words": top_words
}
def extract_as_vector(self, text: str) -> np.ndarray:
"""
从文本中提取统计特征,并转换为向量表示
Args:
text: 文本
Returns:
特征向量,包含各种统计特征
"""
features = self.extract(text)
# 提取数值特征
vector = [
features['char_count'],
features['word_count'],
features['sentence_count'],
features['avg_word_length'],
features['avg_sentence_length'],
int(features['contains_number']),
int(features['contains_english']),
features['punctuation_ratio']
]
return np.array(vector, dtype=np.float32)
class POSFeatureExtractor(FeatureExtractor):
"""词性特征提取器,提取文本的词性特征"""
def __init__(self):
"""初始化词性特征提取器"""
super().__init__()
# 常见中文词性及其解释
self.pos_tags = {
'n': '名词', 'f': '方位名词', 's': '处所名词', 't': '时间名词',
'nr': '人名', 'ns': '地名', 'nt': '机构团体', 'nw': '作品名',
'nz': '其他专名', 'v': '动词', 'vd': '副动词', 'vn': '名动词',
'a': '形容词', 'ad': '副形词', 'an': '名形词', 'd': '副词',
'm': '数词', 'q': '量词', 'r': '代词', 'p': '介词',
'c': '连词', 'u': '助词', 'xc': '其他虚词', 'w': '标点符号'
}
def extract(self, text: str) -> Dict[str, Any]:
"""
从文本中提取词性特征
Args:
text: 文本
Returns:
特征字典,包含各种词性特征
"""
if not text:
return {
"pos_counts": {},
"pos_ratios": {}
}
# 使用jieba进行词性标注
pos_list = pseg.cut(text)
# 统计各词性的数量
pos_counts = {}
total_count = 0
for word, pos in pos_list:
if pos in pos_counts:
pos_counts[pos] += 1
else:
pos_counts[pos] = 1
total_count += 1
# 计算各词性的比例
pos_ratios = {pos: count / total_count for pos, count in pos_counts.items()} if total_count > 0 else {}
return {
"pos_counts": pos_counts,
"pos_ratios": pos_ratios
}
def extract_as_vector(self, text: str) -> np.ndarray:
"""
从文本中提取词性特征,并转换为向量表示
Args:
text: 文本
Returns:
特征向量,包含各词性的比例
"""
features = self.extract(text)
pos_ratios = features['pos_ratios']
# 按照 self.pos_tags 的顺序构建向量
vector = []
for pos in self.pos_tags.keys():
vector.append(pos_ratios.get(pos, 0.0))
return np.array(vector, dtype=np.float32)
class KeywordFeatureExtractor(FeatureExtractor):
"""关键词特征提取器,基于预定义关键词提取特征"""
def __init__(self, category_keywords: Optional[Dict[str, List[str]]] = None):
"""
初始化关键词特征提取器
Args:
category_keywords: 类别关键词字典,键为类别名称,值为关键词列表
"""
super().__init__()
self.category_keywords = category_keywords or self._get_default_keywords()
self.tokenizer = ChineseTokenizer()
def _get_default_keywords(self) -> Dict[str, List[str]]:
"""
获取默认的类别关键词
Returns:
类别关键词字典
"""
# 为每个类别定义一些示例关键词
default_keywords = {
"体育": ["比赛", "运动", "球员", "冠军", "球队", "足球", "篮球"],
"财经": ["股票", "基金", "投资", "市场", "经济", "金融", "股市"],
"房产": ["房价", "楼市", "地产", "购房", "房贷", "物业", "小区"],
"家居": ["装修", "家具", "设计", "卧室", "客厅", "厨房", "风格"],
"教育": ["学校", "学生", "考试", "教育", "大学", "课程", "老师"],
"科技": ["互联网", "科技", "创新", "数字", "智能", "研发", "技术"],
"时尚": ["时尚", "潮流", "服装", "搭配", "品牌", "美容", "穿着"],
"时政": ["政府", "政策", "国家", "发展", "会议", "主席", "总理"],
"游戏": ["游戏", "玩家", "电竞", "网游", "手游", "角色", "任务"],
"娱乐": ["明星", "电影", "节目", "综艺", "电视", "演员", "导演"],
"其他": ["其他", "一般", "常见", "普通", "正常", "通常", "传统"]
}
# 确保 CATEGORIES 中的每个类别都有关键词
for category in CATEGORIES:
if category not in default_keywords:
default_keywords[category] = [category]
return default_keywords
def extract(self, text: str) -> Dict[str, Any]:
"""
从文本中提取关键词特征
Args:
text: 文本
Returns:
特征字典,包含各类别的关键词匹配情况
"""
if not text:
return {
"keyword_matches": {cat: 0 for cat in self.category_keywords},
"keyword_match_ratios": {cat: 0.0 for cat in self.category_keywords}
}
# 对文本分词
words = set(self.tokenizer.tokenize(text, return_string=False))
# 统计各类别的关键词匹配数量
keyword_matches = {}
for category, keywords in self.category_keywords.items():
# 计算文本中包含的该类别关键词数量
matches = sum(1 for kw in keywords if kw in words)
keyword_matches[category] = matches
# 计算匹配比例(归一化)
total_matches = sum(keyword_matches.values())
keyword_match_ratios = {
cat: matches / total_matches if total_matches > 0 else 0.0
for cat, matches in keyword_matches.items()
}
return {
"keyword_matches": keyword_matches,
"keyword_match_ratios": keyword_match_ratios
}
def extract_as_vector(self, text: str) -> np.ndarray:
"""
从文本中提取关键词特征,并转换为向量表示
Args:
text: 文本
Returns:
特征向量,包含各类别的关键词匹配比例
"""
features = self.extract(text)
match_ratios = features['keyword_match_ratios']
# 按照 CATEGORIES 的顺序构建向量
vector = [match_ratios.get(cat, 0.0) for cat in CATEGORIES]
return np.array(vector, dtype=np.float32)
def update_keywords(self, category: str, keywords: List[str]) -> None:
"""
更新指定类别的关键词
Args:
category: 类别名称
keywords: 关键词列表
"""
self.category_keywords[category] = keywords
logger.info(f"已更新类别 {category} 的关键词,共 {len(keywords)}")
def add_keywords(self, category: str, keywords: List[str]) -> None:
"""
向指定类别添加关键词
Args:
category: 类别名称
keywords: 要添加的关键词列表
"""
if category in self.category_keywords:
existing_keywords = set(self.category_keywords[category])
for keyword in keywords:
existing_keywords.add(keyword)
self.category_keywords[category] = list(existing_keywords)
else:
self.category_keywords[category] = keywords
logger.info(f"已向类别 {category} 添加关键词,当前共 {len(self.category_keywords[category])}")
class CombinedFeatureExtractor(FeatureExtractor):
"""组合特征提取器,组合多个特征提取器的结果"""
def __init__(self, extractors: List[FeatureExtractor]):
"""
初始化组合特征提取器
Args:
extractors: 特征提取器列表
"""
super().__init__()
self.extractors = extractors
def extract(self, text: str) -> Dict[str, Any]:
"""
从文本中提取组合特征
Args:
text: 文本
Returns:
特征字典,包含所有特征提取器的结果
"""
combined_features = {}
for i, extractor in enumerate(self.extractors):
extractor_name = type(extractor).__name__
features = extractor.extract(text)
combined_features[extractor_name] = features
return combined_features
def extract_as_vector(self, text: str) -> np.ndarray:
"""
从文本中提取组合特征,并转换为向量表示
Args:
text: 文本
Returns:
特征向量,包含所有特征提取器的向量拼接
"""
# 获取所有特征提取器的向量
feature_vectors = [extractor.extract_as_vector(text) for extractor in self.extractors]
# 拼接向量
return np.concatenate(feature_vectors)