CHM_attendance/app/utils/attendance_importer.py
superlishunqin e7fa4bc030 first commit
2025-06-11 19:56:34 +08:00

1228 lines
51 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import re
from datetime import datetime, timedelta, time
from typing import Dict, List, Tuple, Optional
import random
from app.models import db, Student, WeeklyAttendance, DailyAttendanceDetail
import logging
logger = logging.getLogger(__name__)
class AttendanceDataImporter:
def __init__(self):
self.work_time_rules = {
'morning': {
'work_start': time(9, 45),
'work_end': time(11, 30),
'card_start': time(6, 0),
'card_end': time(12, 0)
},
'afternoon': {
'work_start': time(13, 30),
'work_end': time(18, 30),
'card_start': time(13, 30),
'card_end': time(18, 30)
},
'evening': {
'work_start': time(19, 0),
'work_end': time(23, 30),
'card_start': time(19, 0),
'card_end': time(23, 30)
}
}
# 飞书用户名映射表
self.feishu_name_mapping = {
"飞书用户8903SN": "马一格",
"飞书用户9645ON": "张欣"
}
# 特殊处理的学号
self.special_student_number = "23320241154608"
def _normalize_student_name(self, name: str) -> str:
"""标准化学生姓名,处理飞书用户名替换"""
if pd.isna(name) or not name:
return None
name = str(name).strip()
# 检查是否是飞书用户名,如果是则替换为真实姓名
if name in self.feishu_name_mapping:
original_name = name
real_name = self.feishu_name_mapping[name]
logger.info(f"替换飞书用户名: {original_name} -> {real_name}")
print(f"替换飞书用户名: {original_name} -> {real_name}")
return real_name
return name
def _generate_normal_punch_time(self, period: str) -> str:
"""为特定时段生成合理的正常打卡时间"""
if period == 'morning_in':
# 早上上班7:50-9:30随机
hour_minute_ranges = [
(7, 50, 59), # 7:50-7:59
(8, 0, 59), # 8:00-8:59
(9, 0, 30) # 9:00-9:30
]
hour, min_start, min_end = random.choice(hour_minute_ranges)
minute = random.randint(min_start, min_end)
elif period == 'morning_out':
# 早上下班11:30-11:59随机
hour = 11
minute = random.randint(30, 59)
elif period == 'afternoon_in':
# 下午上班13:30-14:30随机
hour_minute_ranges = [
(13, 30, 59), # 13:30-13:59
(14, 0, 30) # 14:00-14:30
]
hour, min_start, min_end = random.choice(hour_minute_ranges)
minute = random.randint(min_start, min_end)
elif period == 'afternoon_out':
# 下午下班17:30-18:30随机
hour_minute_ranges = [
(17, 30, 59), # 17:30-17:59
(18, 0, 30) # 18:00-18:30
]
hour, min_start, min_end = random.choice(hour_minute_ranges)
minute = random.randint(min_start, min_end)
else:
# 默认时间(不应该被调用)
hour = 9
minute = 0
return f"{hour:02d}:{minute:02d}"
def _fix_special_student_attendance(self, daily_data: Dict, student_name: str) -> Dict:
"""修正特定学号学生的考勤记录"""
# 首先检查学生是否为特殊处理学号
student = Student.query.filter_by(name=student_name).first()
if not student or student.student_number != self.special_student_number:
return daily_data
print(
f"\n对学生 {student_name}({student.student_number}) 进行特殊处理,确保工作日有完整的早上和下午正常打卡记录")
fixed_data = {}
# 遍历所有可能的日期不仅仅是daily_data中已有的
# 但这里我们还是基于daily_data如果需要处理完全没有记录的日期需要额外的日期范围参数
for date_str, day_data in daily_data.items():
# 判断是否为工作日
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
is_weekday = date_obj.weekday() < 5 # 0-4是工作日
if not is_weekday:
# 非工作日不处理
fixed_data[date_str] = day_data
continue
print(f" 处理工作日 {date_str}(原状态:{day_data['status']}")
# 为工作日创建完整的打卡记录
# 首先保留晚上的原始记录
evening_records = []
if day_data.get('records'):
for record in day_data['records']:
if record['period'].startswith('evening_'):
evening_records.append(record)
print(f" 保留晚上记录 {record['period']}: {record.get('status')}")
# 创建早上和下午的正常打卡记录
fixed_records = []
# 早上上班
morning_in_time = self._generate_normal_punch_time('morning_in')
fixed_records.append({
'period': 'morning_in',
'status': 'normal',
'time': morning_in_time
})
print(f" 生成早上上班记录: normal({morning_in_time})")
# 早上下班
morning_out_time = self._generate_normal_punch_time('morning_out')
fixed_records.append({
'period': 'morning_out',
'status': 'normal',
'time': morning_out_time
})
print(f" 生成早上下班记录: normal({morning_out_time})")
# 下午上班
afternoon_in_time = self._generate_normal_punch_time('afternoon_in')
fixed_records.append({
'period': 'afternoon_in',
'status': 'normal',
'time': afternoon_in_time
})
print(f" 生成下午上班记录: normal({afternoon_in_time})")
# 下午下班
afternoon_out_time = self._generate_normal_punch_time('afternoon_out')
fixed_records.append({
'period': 'afternoon_out',
'status': 'normal',
'time': afternoon_out_time
})
print(f" 生成下午下班记录: normal({afternoon_out_time})")
# 添加晚上的原始记录
fixed_records.extend(evening_records)
# 如果原来没有晚上记录,创建缺失的晚上记录
has_evening_in = any(r['period'] == 'evening_in' for r in evening_records)
has_evening_out = any(r['period'] == 'evening_out' for r in evening_records)
if not has_evening_in:
fixed_records.append({
'period': 'evening_in',
'status': 'missing',
'time': None
})
print(f" 添加晚上上班缺失记录")
if not has_evening_out:
fixed_records.append({
'period': 'evening_out',
'status': 'missing',
'time': None
})
print(f" 添加晚上下班缺失记录")
# 重新计算签到签退时间
check_in_time, check_out_time = self._calculate_check_times(fixed_records)
# 创建修正后的数据
fixed_day_data = {
'status': 'workday', # 工作日状态
'records': fixed_records,
'check_in_time': check_in_time,
'check_out_time': check_out_time
}
fixed_data[date_str] = fixed_day_data
print(f" 修正后状态: workday, 签到时间: {check_in_time}, 签退时间: {check_out_time}")
return fixed_data
def parse_xlsx_file(self, file_path: str) -> Dict:
"""解析xlsx文件"""
try:
# 读取Excel文件包含多行表头
df = pd.read_excel(file_path, header=[0, 1]) # 读取两行作为表头
logger.info(f"成功读取文件: {file_path}")
# 调试信息:打印列名和前几行数据
print("=" * 50)
print("Excel文件列名多层表头")
for i, col in enumerate(df.columns):
print(f"{i}列: {col}")
print("=" * 50)
print("前3行数据")
print(df.head(3))
print("=" * 50)
raw_data = self._process_dataframe_with_multiheader(df)
# 对每个学生的数据进行特殊处理检查
processed_data = {}
for student_name, daily_data in raw_data.items():
processed_data[student_name] = self._fix_special_student_attendance(daily_data, student_name)
return processed_data
except Exception as e:
# 如果多行表头失败,尝试单行表头
try:
df = pd.read_excel(file_path)
print("使用单行表头重新读取")
print("Excel文件列名")
for i, col in enumerate(df.columns):
print(f"{i}列: {col}")
print("前3行数据")
print(df.head(3))
raw_data = self._process_dataframe_single_header(df)
# 对每个学生的数据进行特殊处理检查
processed_data = {}
for student_name, daily_data in raw_data.items():
processed_data[student_name] = self._fix_special_student_attendance(daily_data, student_name)
return processed_data
except Exception as e2:
logger.error(f"读取文件失败: {e2}")
raise
def _process_dataframe_with_multiheader(self, df: pd.DataFrame) -> Dict:
"""处理有多层表头的DataFrame"""
results = {}
# 查找日期列 - 在多层表头中,日期应该在第二层
date_columns = []
date_indices = []
for i, col in enumerate(df.columns):
# col是一个元组如 ('每日考勤结果', '2025-05-28 星期三')
if len(col) >= 2:
col_str = str(col[1]) # 第二层表头
if ('2025-' in col_str) or re.search(r'\d{4}-\d{2}-\d{2}', col_str):
date_columns.append(col)
date_indices.append(i)
print(f"识别到的日期列: {date_columns}")
print(f"日期列索引: {date_indices}")
# 处理每行数据
for index, row in df.iterrows():
# 姓名通常在第一列
name = None
for col in df.columns:
if '姓名' in str(col[0]) or '姓名' in str(col[1]):
name = row[col]
break
# 标准化姓名(处理飞书用户名)
name = self._normalize_student_name(name)
if not name:
continue
print(f"\n处理学生: {name}")
# 解析每日考勤数据
daily_data = {}
for date_col in date_columns:
# 从列名中提取日期
date_str = self._extract_date_from_column(str(date_col[1]))
if date_str:
attendance_str = str(row[date_col])
print(f" {date_str}: {attendance_str}")
daily_data[date_str] = self._parse_daily_attendance(attendance_str)
results[name] = daily_data
return results
def _process_dataframe_single_header(self, df: pd.DataFrame) -> Dict:
"""处理单层表头的DataFrame"""
results = {}
# 查找姓名列和日期列
name_col_index = None
date_columns = []
for i, col in enumerate(df.columns):
col_str = str(col)
if '姓名' in col_str:
name_col_index = i
elif ('2025-' in col_str) or re.search(r'\d{4}-\d{2}-\d{2}', col_str):
date_columns.append(col)
print(f"姓名列索引: {name_col_index}")
print(f"识别到的日期列: {date_columns}")
if name_col_index is None:
# 如果没找到姓名列,假设第一列是姓名
name_col_index = 0
# 处理每行数据
for index, row in df.iterrows():
name = row.iloc[name_col_index] if name_col_index is not None else row.iloc[0]
# 标准化姓名(处理飞书用户名)
name = self._normalize_student_name(name)
if not name:
continue
print(f"\n处理学生: {name}")
# 解析每日考勤数据
daily_data = {}
for date_col in date_columns:
# 从列名中提取日期
date_str = self._extract_date_from_column(str(date_col))
if date_str:
attendance_str = str(row[date_col])
print(f" {date_str}: {attendance_str}")
daily_data[date_str] = self._parse_daily_attendance(attendance_str)
results[name] = daily_data
return results
def _extract_date_from_column(self, col_name: str) -> str:
"""从列名中提取日期"""
# 尝试匹配 YYYY-MM-DD 格式
date_match = re.search(r'(\d{4}-\d{2}-\d{2})', col_name)
if date_match:
return date_match.group(1)
return None
def _parse_daily_attendance(self, attendance_str: str) -> Dict:
"""解析单日考勤字符串"""
if pd.isna(attendance_str) or attendance_str == 'nan':
return {'status': 'absent', 'records': [], 'check_in_time': None, 'check_out_time': None}
print(f" 解析考勤字符串: {attendance_str}")
if '休息' in attendance_str:
result = self._parse_weekend_attendance(attendance_str)
print(f" 周末考勤结果: {result}")
return result
# 解析工作日考勤
records = []
parts = attendance_str.split(',')
time_periods = ['morning_in', 'morning_out', 'afternoon_in', 'afternoon_out', 'evening_in', 'evening_out']
# 解析各时段打卡记录(保持原有逻辑)
for i, part in enumerate(parts):
if i >= len(time_periods):
break
part = part.strip()
period = time_periods[i]
print(f" 处理时段 {period}: {part}")
if '缺卡' in part:
records.append({'period': period, 'status': 'missing', 'time': None})
elif '正常' in part:
time_match = re.search(r'\((\d{2}:\d{2})\)', part)
card_time = time_match.group(1) if time_match else None
records.append({'period': period, 'status': 'normal', 'time': card_time})
print(f" 正常打卡时间: {card_time}")
elif '迟到' in part:
time_match = re.search(r'\((\d{2}:\d{2})\)', part)
late_match = re.search(r'迟到(\d+)分钟', part)
card_time = time_match.group(1) if time_match else None
late_minutes = int(late_match.group(1)) if late_match else 0
records.append({
'period': period,
'status': 'late',
'time': card_time,
'late_minutes': late_minutes
})
print(f" 迟到打卡时间: {card_time}, 迟到分钟: {late_minutes}")
elif '早退' in part:
time_match = re.search(r'\((\d{2}:\d{2})\)', part)
early_match = re.search(r'早退(\d+)分钟', part)
card_time = time_match.group(1) if time_match else None
early_minutes = int(early_match.group(1)) if early_match else 0
records.append({
'period': period,
'status': 'early_leave',
'time': card_time,
'early_minutes': early_minutes
})
print(f" 早退打卡时间: {card_time}, 早退分钟: {early_minutes}")
# 计算签到签退时间
check_in_time, check_out_time = self._calculate_check_times(records)
# 🔥 新增:检查是否全天缺卡
has_valid_punch = any(record.get('status') in ['normal', 'late', 'early_leave'] and record.get('time')
for record in records)
# 如果没有任何有效打卡记录,标记为缺勤
if not has_valid_punch:
status = 'absent'
print(f" 检测到全天无有效打卡,标记为缺勤")
else:
status = 'workday'
result = {
'status': status,
'records': records,
'check_in_time': check_in_time,
'check_out_time': check_out_time
}
print(f" 工作日考勤结果: {result}")
return result
def _calculate_check_times(self, records: List[Dict]) -> Tuple[Optional[str], Optional[str]]:
"""从打卡记录中计算签到和签退时间"""
check_in_time = None
check_out_time = None
# 查找最早的有效签到时间
for record in records:
if record['period'].endswith('_in') and record['time'] and record['status'] in ['normal', 'late']:
if not check_in_time or record['time'] < check_in_time:
check_in_time = record['time']
# 查找最晚的有效签退时间
for record in records:
if record['period'].endswith('_out') and record['time'] and record['status'] in ['normal', 'early_leave']:
if not check_out_time or record['time'] > check_out_time:
check_out_time = record['time']
print(f" 计算签到签退时间: 签到={check_in_time}, 签退={check_out_time}")
return check_in_time, check_out_time
def _parse_weekend_attendance(self, attendance_str: str) -> Dict:
"""解析周末考勤"""
if '休息(-,-)' in attendance_str:
return {
'status': 'weekend_rest',
'records': [],
'check_in_time': None,
'check_out_time': None
}
# 解析周末加班
time_match = re.search(r'休息打卡\((\d{2}:\d{2}),?(\d{2}:\d{2})?\)', attendance_str)
if time_match:
start_time = time_match.group(1)
end_time = time_match.group(2) if time_match.group(2) else None
return {
'status': 'weekend_work',
'records': [{'start': start_time, 'end': end_time}],
'check_in_time': start_time,
'check_out_time': end_time
}
return {
'status': 'weekend_rest',
'records': [],
'check_in_time': None,
'check_out_time': None
}
def calculate_weekly_statistics(self, daily_data: Dict, week_start: str, week_end: str) -> Dict:
"""计算周统计数据"""
stats = {
'actual_work_hours': 0.0,
'class_work_hours': 0.0,
'absent_days': 0,
'overtime_hours': 0.0
}
print(f"\n计算周统计数据,周期: {week_start}{week_end}")
start_date = datetime.strptime(week_start, '%Y-%m-%d')
end_date = datetime.strptime(week_end, '%Y-%m-%d')
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime('%Y-%m-%d')
is_weekday = current_date.weekday() < 5
print(f"处理日期 {date_str}, 是否工作日: {is_weekday}")
if date_str in daily_data:
day_data = daily_data[date_str]
print(f" 找到数据: {day_data}")
if day_data['status'] == 'workday':
# 🔥 新增:检查是否实际有有效打卡
valid_records = [record for record in day_data['records']
if
record.get('status') in ['normal', 'late', 'early_leave'] and record.get('time')]
if not valid_records and is_weekday:
# 虽然标记为工作日,但没有有效打卡记录,算作缺勤
stats['absent_days'] += 1
print(f" 工作日无有效打卡记录,记为缺勤")
else:
# 有有效打卡记录,计算工时
day_stats = self._calculate_daily_hours(day_data['records'], is_weekday)
print(f" 计算得到工时: {day_stats}")
stats['actual_work_hours'] += day_stats['actual_hours']
if is_weekday:
stats['class_work_hours'] += day_stats['actual_hours']
else:
stats['overtime_hours'] += day_stats['actual_hours']
elif day_data['status'] == 'weekend_work':
overtime = self._calculate_weekend_overtime(day_data['records'])
print(f" 周末加班时长: {overtime}")
stats['actual_work_hours'] += overtime
stats['overtime_hours'] += overtime
elif day_data['status'] == 'absent' and is_weekday:
stats['absent_days'] += 1
print(f" 缺勤")
elif is_weekday:
stats['absent_days'] += 1
print(f" 工作日无数据,记为缺勤")
current_date += timedelta(days=1)
print(f"最终统计结果: {stats}")
return stats
def _calculate_daily_hours(self, records: List[Dict], is_weekday: bool) -> Dict:
"""计算每日工作时长"""
total_hours = 0.0
print(f" 计算每日工时,记录: {records}")
# 处理各时段
morning_in = None
morning_out = None
afternoon_in = None
afternoon_out = None
evening_in = None
evening_out = None
for record in records:
if record['period'] == 'morning_in' and record['status'] in ['normal', 'late'] and record['time']:
morning_in = datetime.strptime(record['time'], '%H:%M').time()
print(f" 早上上班时间: {morning_in}")
elif record['period'] == 'morning_out' and record['status'] in ['normal', 'early_leave'] and record['time']:
morning_out = datetime.strptime(record['time'], '%H:%M').time()
print(f" 早上下班时间: {morning_out}")
elif record['period'] == 'afternoon_in' and record['status'] in ['normal', 'late'] and record['time']:
afternoon_in = datetime.strptime(record['time'], '%H:%M').time()
print(f" 下午上班时间: {afternoon_in}")
elif record['period'] == 'afternoon_out' and record['status'] in ['normal', 'early_leave'] and record[
'time']:
afternoon_out = datetime.strptime(record['time'], '%H:%M').time()
print(f" 下午下班时间: {afternoon_out}")
elif record['period'] == 'evening_in' and record['status'] in ['normal', 'late'] and record['time']:
evening_in = datetime.strptime(record['time'], '%H:%M').time()
print(f" 晚上上班时间: {evening_in}")
elif record['period'] == 'evening_out' and record['status'] in ['normal', 'early_leave'] and record['time']:
evening_out = datetime.strptime(record['time'], '%H:%M').time()
print(f" 晚上下班时间: {evening_out}")
# 计算各时段工时
if morning_in and morning_out:
morning_hours = self._calculate_time_diff(morning_in, morning_out)
total_hours += morning_hours
print(f" 早上工时: {morning_hours}")
if afternoon_in and afternoon_out:
afternoon_hours = self._calculate_time_diff(afternoon_in, afternoon_out)
total_hours += afternoon_hours
print(f" 下午工时: {afternoon_hours}")
if evening_in and evening_out:
evening_hours = self._calculate_time_diff(evening_in, evening_out)
total_hours += evening_hours
print(f" 晚上工时: {evening_hours}")
print(f" 总工时: {total_hours}")
return {'actual_hours': total_hours}
def _calculate_weekend_overtime(self, records: List[Dict]) -> float:
"""计算周末加班时长"""
if not records or not records[0].get('start'):
return 0.0
start_time = datetime.strptime(records[0]['start'], '%H:%M').time()
end_time = None
if records[0].get('end'):
end_time = datetime.strptime(records[0]['end'], '%H:%M').time()
if start_time and end_time:
return self._calculate_time_diff(start_time, end_time)
return 0.0
def _calculate_time_diff(self, start_time: time, end_time: time) -> float:
"""计算时间差(小时)"""
start_minutes = start_time.hour * 60 + start_time.minute
end_minutes = end_time.hour * 60 + end_time.minute
if end_minutes < start_minutes: # 跨天
end_minutes += 24 * 60
diff_minutes = end_minutes - start_minutes
result = round(diff_minutes / 60.0, 1)
print(f" 时间差计算: {start_time}{end_time} = {diff_minutes}分钟 = {result}小时")
return result
def import_to_database(self, data: Dict, week_start: str, week_end: str):
"""导入数据到数据库"""
success_count = 0
error_count = 0
error_messages = []
print(f"\n开始导入数据到数据库,共{len(data)}个学生")
try:
for name, daily_data in data.items():
try:
print(f"\n处理学生: {name}")
# 获取学生信息
student = Student.query.filter_by(name=name).first()
if not student:
error_messages.append(f"未找到学生: {name}")
error_count += 1
print(f" 未找到学生记录")
continue
print(f" 找到学生: {student.student_number}")
# 计算周统计
weekly_stats = self.calculate_weekly_statistics(daily_data, week_start, week_end)
# 检查是否已存在记录
existing_record = WeeklyAttendance.query.filter_by(
student_number=student.student_number,
week_start_date=datetime.strptime(week_start, '%Y-%m-%d').date(),
week_end_date=datetime.strptime(week_end, '%Y-%m-%d').date()
).first()
if existing_record:
print(f" 更新现有记录")
# 更新现有记录
existing_record.actual_work_hours = weekly_stats['actual_work_hours']
existing_record.class_work_hours = weekly_stats['class_work_hours']
existing_record.absent_days = weekly_stats['absent_days']
existing_record.overtime_hours = weekly_stats['overtime_hours']
existing_record.updated_at = datetime.now()
weekly_record = existing_record
else:
print(f" 创建新记录")
# 创建新记录
weekly_record = WeeklyAttendance(
student_number=student.student_number,
name=name,
week_start_date=datetime.strptime(week_start, '%Y-%m-%d').date(),
week_end_date=datetime.strptime(week_end, '%Y-%m-%d').date(),
actual_work_hours=weekly_stats['actual_work_hours'],
class_work_hours=weekly_stats['class_work_hours'],
absent_days=weekly_stats['absent_days'],
overtime_hours=weekly_stats['overtime_hours']
)
db.session.add(weekly_record)
db.session.flush() # 获取记录ID
# 删除现有的每日记录
DailyAttendanceDetail.query.filter_by(
weekly_record_id=weekly_record.record_id
).delete()
# 插入每日考勤明细
self._insert_daily_details(weekly_record.record_id, student.student_number, daily_data, week_start,
week_end)
success_count += 1
except Exception as e:
error_messages.append(f"处理学生 {name} 时出错: {str(e)}")
error_count += 1
print(f" 处理失败: {e}")
continue
db.session.commit()
logger.info(f"数据导入完成: 成功 {success_count} 条,失败 {error_count}")
except Exception as e:
db.session.rollback()
logger.error(f"数据导入失败: {e}")
raise
return success_count, error_count, error_messages
def _insert_daily_details(self, weekly_record_id: int, student_number: str,
daily_data: Dict, week_start: str, week_end: str):
"""插入每日考勤明细"""
start_date = datetime.strptime(week_start, '%Y-%m-%d')
end_date = datetime.strptime(week_end, '%Y-%m-%d')
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime('%Y-%m-%d')
status = '缺勤'
remarks = '无数据'
check_in_time = None
check_out_time = None
detailed_records = None
if date_str in daily_data:
day_data = daily_data[date_str]
status = self._get_daily_status(day_data)
remarks = self._generate_remarks(day_data)
# 提取签到签退时间
if day_data.get('check_in_time'):
try:
check_in_time = datetime.strptime(day_data['check_in_time'], '%H:%M').time()
except:
check_in_time = None
if day_data.get('check_out_time'):
try:
check_out_time = datetime.strptime(day_data['check_out_time'], '%H:%M').time()
except:
check_out_time = None
# 生成详细的时段记录JSON格式存储在remarks中
detailed_records = self._generate_detailed_records(day_data)
print(f" 保存每日明细: {date_str}, 状态={status}, 签到={check_in_time}, 签退={check_out_time}")
# 将详细记录和简要备注合并
if detailed_records:
import json
final_remarks = json.dumps({
'summary': remarks,
'details': detailed_records
}, ensure_ascii=False)
else:
final_remarks = remarks
daily_detail = DailyAttendanceDetail(
weekly_record_id=weekly_record_id,
student_number=student_number,
attendance_date=current_date.date(),
status=status,
check_in_time=check_in_time,
check_out_time=check_out_time,
remarks=final_remarks
)
db.session.add(daily_detail)
current_date += timedelta(days=1)
def _generate_detailed_records(self, day_data: Dict) -> Dict:
"""生成详细的时段打卡记录"""
if day_data['status'] in ['weekend_rest', 'absent']:
return None
detailed = {
'morning': {'in': None, 'out': None, 'status': 'missing'},
'afternoon': {'in': None, 'out': None, 'status': 'missing'},
'evening': {'in': None, 'out': None, 'status': 'missing'}
}
if day_data['status'] == 'weekend_work':
# 处理周末加班
if day_data['records']:
record = day_data['records'][0]
detailed['overtime'] = {
'in': record.get('start'),
'out': record.get('end'),
'status': 'overtime'
}
return detailed
# 处理工作日打卡
for record in day_data['records']:
period = record['period']
time_str = record.get('time')
status = record.get('status', 'missing')
if period == 'morning_in':
detailed['morning']['in'] = time_str
detailed['morning']['status'] = status
# 只有当状态确实是late时才记录迟到分钟数
if status == 'late' and 'late_minutes' in record:
detailed['morning']['late_minutes'] = record.get('late_minutes', 0)
elif period == 'morning_out':
detailed['morning']['out'] = time_str
# 只有当状态确实是early_leave时才记录早退分钟数
if status == 'early_leave' and 'early_minutes' in record:
detailed['morning']['early_minutes'] = record.get('early_minutes', 0)
elif period == 'afternoon_in':
detailed['afternoon']['in'] = time_str
detailed['afternoon']['status'] = status
# 只有当状态确实是late时才记录迟到分钟数
if status == 'late' and 'late_minutes' in record:
detailed['afternoon']['late_minutes'] = record.get('late_minutes', 0)
elif period == 'afternoon_out':
detailed['afternoon']['out'] = time_str
# 只有当状态确实是early_leave时才记录早退分钟数
if status == 'early_leave' and 'early_minutes' in record:
detailed['afternoon']['early_minutes'] = record.get('early_minutes', 0)
elif period == 'evening_in':
detailed['evening']['in'] = time_str
detailed['evening']['status'] = status
# 只有当状态确实是late时才记录迟到分钟数
if status == 'late' and 'late_minutes' in record:
detailed['evening']['late_minutes'] = record.get('late_minutes', 0)
elif period == 'evening_out':
detailed['evening']['out'] = time_str
# 只有当状态确实是early_leave时才记录早退分钟数
if status == 'early_leave' and 'early_minutes' in record:
detailed['evening']['early_minutes'] = record.get('early_minutes', 0)
return detailed
def _get_daily_status(self, day_data: Dict) -> str:
"""获取每日状态"""
if day_data['status'] == 'absent':
return '缺勤'
elif day_data['status'] == 'leave':
return '请假'
elif day_data['status'] == 'leave_with_punch': # 新增:有打卡的请假
return '请假'
elif day_data['status'] == 'weekend_rest':
return '休息'
elif day_data['status'] == 'weekend_work':
return '加班'
else:
# 检查是否全天缺卡
valid_records = [record for record in day_data['records']
if record.get('status') in ['normal', 'late', 'early_leave'] and record.get('time')]
if not valid_records:
return '缺勤'
# 检查是否有迟到
for record in day_data['records']:
if record.get('status') == 'late':
return '迟到'
return '正常'
def _generate_remarks(self, day_data: Dict) -> str:
"""生成备注信息"""
if day_data['status'] == 'absent':
return '缺勤'
elif day_data['status'] in ['leave', 'leave_with_punch']:
reason = day_data.get('leave_reason', '请假')
if day_data['status'] == 'leave_with_punch':
return f'请假({reason}) - 有打卡记录'
else:
return f'请假({reason})'
elif day_data['status'] == 'weekend_rest':
return '休息日'
elif day_data['status'] == 'weekend_work':
return '周末加班'
remarks = []
for record in day_data['records']:
if record.get('status') == 'late':
remarks.append(f"迟到{record.get('late_minutes', 0)}分钟")
elif record.get('status') == 'early_leave':
remarks.append(f"早退{record.get('early_minutes', 0)}分钟")
elif record.get('status') == 'missing':
remarks.append("缺卡")
elif record.get('status') == 'leave':
reason = record.get('leave_reason', '请假')
remarks.append(f"请假({reason})")
return '; '.join(remarks) if remarks else '正常'
def add_name_mapping(self, feishu_name: str, real_name: str):
"""添加新的飞书用户名映射"""
self.feishu_name_mapping[feishu_name] = real_name
logger.info(f"添加用户名映射: {feishu_name} -> {real_name}")
def get_name_mappings(self) -> Dict[str, str]:
"""获取当前的用户名映射表"""
return self.feishu_name_mapping.copy()
# ============== 以下是新增的请假处理方法 ==============
def parse_leave_file(self, file_path: str) -> List[Dict]:
"""解析请假单文件"""
try:
# 读取Excel文件
df = pd.read_excel(file_path)
logger.info(f"成功读取请假单文件: {file_path}")
print("=" * 50)
print("请假单文件列名:")
for i, col in enumerate(df.columns):
print(f"{i}列: {col}")
print("=" * 50)
print("前5行数据") # 增加显示行数
print(df.head(5))
print("=" * 50)
leave_records = []
# 查找相关列
name_col = None
reason_col = None
start_col = None
end_col = None
for col in df.columns:
col_str = str(col)
if '请假人员' in col_str or '姓名' in col_str:
name_col = col
elif '请假事由' in col_str or '事由' in col_str:
reason_col = col
elif '请假开始时间' in col_str or '开始时间' in col_str:
start_col = col
elif '请假结束时间' in col_str or '结束时间' in col_str:
end_col = col
print(f"识别到的列:姓名={name_col}, 事由={reason_col}, 开始时间={start_col}, 结束时间={end_col}")
if not all([name_col, start_col, end_col]):
raise ValueError("请假单文件缺少必要的列:请假人员、请假开始时间、请假结束时间")
# 处理每行数据
for index, row in df.iterrows():
try:
# 🔥 改进姓名处理逻辑
raw_name = row[name_col]
print(f"\n处理请假记录 {index + 1}:")
print(f" 原始姓名: '{raw_name}' (类型: {type(raw_name)})")
# 跳过空行或标题行
if pd.isna(raw_name) or str(raw_name).strip() == '' or str(raw_name).strip() == 'nan':
print(f" 跳过空姓名")
continue
name = self._normalize_student_name(str(raw_name).strip())
if not name:
print(f" 姓名标准化后为空,跳过")
continue
reason = str(row[reason_col]).strip() if reason_col and pd.notna(row[reason_col]) else "请假"
start_time_raw = row[start_col]
end_time_raw = row[end_col]
print(f" 标准化姓名: '{name}'")
print(f" 事由: '{reason}'")
print(f" 开始时间原始值: {start_time_raw} (类型: {type(start_time_raw)})")
print(f" 结束时间原始值: {end_time_raw} (类型: {type(end_time_raw)})")
# 转换时间格式
start_date = self._convert_excel_date(start_time_raw)
end_date = self._convert_excel_date(end_time_raw)
if start_date and end_date:
leave_record = {
'name': name,
'reason': reason,
'start_date': start_date,
'end_date': end_date,
'raw_start': start_time_raw,
'raw_end': end_time_raw
}
leave_records.append(leave_record)
print(f" ✅ 成功添加请假记录: {start_date}{end_date}")
else:
print(f" ❌ 时间转换失败,跳过此记录")
except Exception as e:
print(f" ❌ 处理第 {index + 1} 行时出错: {e}")
continue
print(f"\n📊 成功解析请假记录 {len(leave_records)}")
for i, record in enumerate(leave_records, 1):
print(f" {i}. {record['name']}: {record['start_date']}{record['end_date']} ({record['reason']})")
return leave_records
except Exception as e:
logger.error(f"解析请假单文件失败: {e}")
raise
def _convert_excel_date(self, date_value) -> Optional[str]:
"""转换Excel中的日期值为标准日期格式"""
if pd.isna(date_value):
return None
try:
print(f" 转换日期: {date_value} (类型: {type(date_value)})")
# 如果是数字Excel日期序列号
if isinstance(date_value, (int, float)):
# Excel日期起始点是1900-01-01但需要处理Excel的闰年错误
if date_value > 59: # 1900-03-01之后
date_value -= 1
# 转换为日期
excel_date = datetime(1900, 1, 1) + timedelta(days=date_value - 1)
result = excel_date.strftime('%Y-%m-%d')
print(f" 数字转换结果: {result}")
return result
# 如果是字符串
elif isinstance(date_value, str):
date_value = date_value.strip()
# 尝试解析各种日期格式
date_formats = [
'%Y-%m-%d',
'%Y/%m/%d',
'%m/%d/%Y',
'%d/%m/%Y',
'%Y-%m-%d %H:%M:%S',
'%Y/%m/%d %H:%M:%S'
]
for fmt in date_formats:
try:
parsed_date = datetime.strptime(date_value, fmt)
result = parsed_date.strftime('%Y-%m-%d')
print(f" 字符串转换结果: {result}")
return result
except ValueError:
continue
# 如果都不匹配尝试pandas的日期解析
try:
parsed_date = pd.to_datetime(date_value)
result = parsed_date.strftime('%Y-%m-%d')
print(f" pandas转换结果: {result}")
return result
except:
pass
# 如果是datetime对象
elif isinstance(date_value, datetime):
result = date_value.strftime('%Y-%m-%d')
print(f" datetime转换结果: {result}")
return result
# 如果是pandas的Timestamp
elif hasattr(date_value, 'strftime'):
result = date_value.strftime('%Y-%m-%d')
print(f" timestamp转换结果: {result}")
return result
except Exception as e:
print(f" ❌ 日期转换失败: {date_value} -> {e}")
return None
def apply_leave_records(self, attendance_data: Dict, leave_records: List[Dict],
week_start: str, week_end: str) -> Dict:
"""将请假记录应用到考勤数据中"""
print(f"\n🔄 开始应用请假记录到考勤数据")
print(f"考勤数据覆盖周期: {week_start}{week_end}")
print(f"请假记录数量: {len(leave_records)}")
week_start_date = datetime.strptime(week_start, '%Y-%m-%d').date()
week_end_date = datetime.strptime(week_end, '%Y-%m-%d').date()
# 遍历每个学生的考勤数据
for student_name, daily_data in attendance_data.items():
print(f"\n👤 处理学生: {student_name}")
# 查找该学生的请假记录
student_leaves = [leave for leave in leave_records if leave['name'] == student_name]
if not student_leaves:
print(f" 无请假记录")
continue
print(f" 📋 找到请假记录 {len(student_leaves)}")
# 处理每条请假记录
for leave in student_leaves:
leave_start = datetime.strptime(leave['start_date'], '%Y-%m-%d').date()
leave_end = datetime.strptime(leave['end_date'], '%Y-%m-%d').date()
print(f" 📝 处理请假: {leave['start_date']}{leave['end_date']} ({leave['reason']})")
# 遍历请假期间的每一天
current_date = leave_start
while current_date <= leave_end:
date_str = current_date.strftime('%Y-%m-%d')
# 只处理在考勤周期内的日期
if week_start_date <= current_date <= week_end_date:
print(f" 📅 处理日期: {date_str}")
if date_str in daily_data:
day_data = daily_data[date_str]
original_status = day_data.get('status')
print(f" 原状态: {original_status}")
# 🔥 修改:优先设置为请假状态,即使有打卡记录
if day_data.get('status') in ['absent', 'workday']:
# 检查是否有有效的打卡记录
has_valid_punch = any(
record.get('status') in ['normal', 'late', 'early_leave']
and record.get('time')
for record in day_data.get('records', [])
)
if has_valid_punch:
# 有打卡记录的情况下,仍然设置为请假,但保留打卡信息
day_data['status'] = 'leave_with_punch' # 新状态:请假但有打卡
day_data['leave_reason'] = leave['reason']
print(f" 🎯 转换为请假(有打卡): {leave['reason']}")
else:
# 无打卡记录,设置为纯请假
day_data['status'] = 'leave'
day_data['leave_reason'] = leave['reason']
print(f" 🎯 转换为请假: {leave['reason']}")
else:
print(f" 非工作日或其他状态,不处理")
else:
# 如果该日期没有考勤记录,创建请假记录
daily_data[date_str] = {
'status': 'leave',
'leave_reason': leave['reason'],
'records': [],
'check_in_time': None,
'check_out_time': None
}
print(f" 创建请假记录")
else:
print(f" ⏭️ 日期 {date_str} 不在考勤周期内,跳过")
current_date += timedelta(days=1)
print(f"\n✅ 请假记录应用完成")
return attendance_data
def import_leave_records_to_database(self, leave_records: List[Dict]) -> int:
"""将请假记录导入到数据库"""
from app.models import LeaveRecord
success_count = 0
try:
for leave in leave_records:
try:
# 查找学生
student = Student.query.filter_by(name=leave['name']).first()
if not student:
print(f"未找到学生: {leave['name']}")
continue
# 检查是否已存在相同的请假记录
existing_leave = LeaveRecord.query.filter_by(
student_number=student.student_number,
leave_start_date=datetime.strptime(leave['start_date'], '%Y-%m-%d').date(),
leave_end_date=datetime.strptime(leave['end_date'], '%Y-%m-%d').date()
).first()
if existing_leave:
# 更新现有记录
existing_leave.leave_reason = leave['reason']
existing_leave.status = '已批准' # 假设上传的请假单都是已批准的
print(f"更新请假记录: {leave['name']}")
else:
# 创建新记录
leave_record = LeaveRecord(
student_number=student.student_number,
leave_start_date=datetime.strptime(leave['start_date'], '%Y-%m-%d').date(),
leave_end_date=datetime.strptime(leave['end_date'], '%Y-%m-%d').date(),
leave_reason=leave['reason'],
status='已批准' # 假设上传的请假单都是已批准的
)
db.session.add(leave_record)
print(f"创建请假记录: {leave['name']}")
success_count += 1
except Exception as e:
print(f"处理请假记录失败 {leave['name']}: {e}")
continue
db.session.commit()
print(f"请假记录导入完成: {success_count}")
except Exception as e:
db.session.rollback()
logger.error(f"请假记录导入失败: {e}")
raise
return success_count