CHM_attendance/app/utils/attendance_importer.py
superlishunqin 77b57a9876 0914
2025-09-14 01:28:47 +08:00

1626 lines
71 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import re
from datetime import datetime, timedelta, time
from typing import Dict, List, Tuple, Optional
import random
from app.models import db, Student, WeeklyAttendance, DailyAttendanceDetail
import logging
logger = logging.getLogger(__name__)
class AttendanceDataImporter:
def __init__(self):
self.work_time_rules = {
'morning': {
'work_start': time(9, 45),
'work_end': time(11, 30),
'card_start': time(6, 0),
'card_end': time(12, 0)
},
'afternoon': {
'work_start': time(13, 30),
'work_end': time(18, 30),
'card_start': time(13, 30),
'card_end': time(18, 30)
},
'evening': {
'work_start': time(19, 0),
'work_end': time(23, 30),
'card_start': time(19, 0),
'card_end': time(23, 30)
}
}
# 飞书用户名映射表
self.feishu_name_mapping = {
"飞书用户8903SN": "马一格",
"飞书用户9645ON": "张欣"
}
# 特殊处理的学号
self.special_student_number = "23320241154608"
def _is_evening_period(self, period: str) -> bool:
"""判断是否为晚上时段"""
return period.startswith('evening_')
def _normalize_student_name(self, name: str) -> str:
"""标准化学生姓名,处理飞书用户名替换"""
if pd.isna(name) or not name:
return None
name = str(name).strip()
# 检查是否是飞书用户名,如果是则替换为真实姓名
if name in self.feishu_name_mapping:
original_name = name
real_name = self.feishu_name_mapping[name]
logger.info(f"替换飞书用户名: {original_name} -> {real_name}")
print(f"替换飞书用户名: {original_name} -> {real_name}")
return real_name
return name
def _generate_normal_punch_time(self, period: str) -> str:
"""为特定时段生成合理的正常打卡时间"""
if period == 'morning_in':
# 早上上班7:50-9:30随机
hour_minute_ranges = [
(7, 50, 59), # 7:50-7:59
(8, 0, 59), # 8:00-8:59
(9, 0, 30) # 9:00-9:30
]
hour, min_start, min_end = random.choice(hour_minute_ranges)
minute = random.randint(min_start, min_end)
elif period == 'morning_out':
# 早上下班11:30-11:59随机
hour = 11
minute = random.randint(30, 59)
elif period == 'afternoon_in':
# 下午上班13:30-14:30随机
hour_minute_ranges = [
(13, 30, 59), # 13:30-13:59
(14, 0, 30) # 14:00-14:30
]
hour, min_start, min_end = random.choice(hour_minute_ranges)
minute = random.randint(min_start, min_end)
elif period == 'afternoon_out':
# 下午下班17:30-18:30随机
hour_minute_ranges = [
(17, 30, 59), # 17:30-17:59
(18, 0, 30) # 18:00-18:30
]
hour, min_start, min_end = random.choice(hour_minute_ranges)
minute = random.randint(min_start, min_end)
else:
# 默认时间(不应该被调用)
hour = 9
minute = 0
return f"{hour:02d}:{minute:02d}"
def _fix_special_student_attendance(self, daily_data: Dict, student_name: str) -> Dict:
"""修正特定学号学生的考勤记录"""
# 首先检查学生是否为特殊处理学号
# 注意:这里先不查询数据库,在批量导入时会优化
return daily_data # 暂时返回原数据,在批量处理时再检查
def parse_xlsx_file(self, file_path: str) -> Dict:
"""解析xlsx文件"""
try:
# 读取Excel文件包含多行表头
df = pd.read_excel(file_path, header=[0, 1]) # 读取两行作为表头
logger.info(f"成功读取文件: {file_path}")
# 调试信息:打印列名和前几行数据
print("=" * 50)
print("Excel文件列名多层表头")
for i, col in enumerate(df.columns):
print(f"{i}列: {col}")
print("=" * 50)
print("前3行数据")
print(df.head(3))
print("=" * 50)
raw_data = self._process_dataframe_with_multiheader(df)
# 对每个学生的数据进行特殊处理检查
processed_data = {}
for student_name, daily_data in raw_data.items():
processed_data[student_name] = self._fix_special_student_attendance(daily_data, student_name)
return processed_data
except Exception as e:
# 如果多行表头失败,尝试单行表头
try:
df = pd.read_excel(file_path)
print("使用单行表头重新读取")
print("Excel文件列名")
for i, col in enumerate(df.columns):
print(f"{i}列: {col}")
print("前3行数据")
print(df.head(3))
raw_data = self._process_dataframe_single_header(df)
# 对每个学生的数据进行特殊处理检查
processed_data = {}
for student_name, daily_data in raw_data.items():
processed_data[student_name] = self._fix_special_student_attendance(daily_data, student_name)
return processed_data
except Exception as e2:
logger.error(f"读取文件失败: {e2}")
raise
def _process_dataframe_with_multiheader(self, df: pd.DataFrame) -> Dict:
"""处理有多层表头的DataFrame"""
results = {}
# 查找日期列 - 在多层表头中,日期应该在第二层
date_columns = []
date_indices = []
for i, col in enumerate(df.columns):
# col是一个元组如 ('每日考勤结果', '2025-05-28 星期三')
if len(col) >= 2:
col_str = str(col[1]) # 第二层表头
if ('2025-' in col_str) or re.search(r'\d{4}-\d{2}-\d{2}', col_str):
date_columns.append(col)
date_indices.append(i)
print(f"识别到的日期列: {date_columns}")
print(f"日期列索引: {date_indices}")
# 处理每行数据
for index, row in df.iterrows():
# 姓名通常在第一列
name = None
for col in df.columns:
if '姓名' in str(col[0]) or '姓名' in str(col[1]):
name = row[col]
break
# 标准化姓名(处理飞书用户名)
name = self._normalize_student_name(name)
if not name:
continue
print(f"\n处理学生: {name}")
# 解析每日考勤数据
daily_data = {}
for date_col in date_columns:
# 从列名中提取日期
date_str = self._extract_date_from_column(str(date_col[1]))
if date_str:
attendance_str = str(row[date_col])
print(f" {date_str}: {attendance_str}")
daily_data[date_str] = self._parse_daily_attendance(attendance_str)
results[name] = daily_data
return results
def _process_dataframe_single_header(self, df: pd.DataFrame) -> Dict:
"""处理单层表头的DataFrame"""
results = {}
# 查找姓名列和日期列
name_col_index = None
date_columns = []
for i, col in enumerate(df.columns):
col_str = str(col)
if '姓名' in col_str:
name_col_index = i
elif ('2025-' in col_str) or re.search(r'\d{4}-\d{2}-\d{2}', col_str):
date_columns.append(col)
print(f"姓名列索引: {name_col_index}")
print(f"识别到的日期列: {date_columns}")
if name_col_index is None:
# 如果没找到姓名列,假设第一列是姓名
name_col_index = 0
# 处理每行数据
for index, row in df.iterrows():
name = row.iloc[name_col_index] if name_col_index is not None else row.iloc[0]
# 标准化姓名(处理飞书用户名)
name = self._normalize_student_name(name)
if not name:
continue
print(f"\n处理学生: {name}")
# 解析每日考勤数据
daily_data = {}
for date_col in date_columns:
# 从列名中提取日期
date_str = self._extract_date_from_column(str(date_col))
if date_str:
attendance_str = str(row[date_col])
print(f" {date_str}: {attendance_str}")
daily_data[date_str] = self._parse_daily_attendance(attendance_str)
results[name] = daily_data
return results
def _extract_date_from_column(self, col_name: str) -> str:
"""从列名中提取日期"""
# 尝试匹配 YYYY-MM-DD 格式
date_match = re.search(r'(\d{4}-\d{2}-\d{2})', col_name)
if date_match:
return date_match.group(1)
return None
def _parse_daily_attendance(self, attendance_str: str) -> Dict:
"""解析单日考勤字符串"""
if pd.isna(attendance_str) or attendance_str == 'nan':
return {'status': 'absent', 'records': [], 'check_in_time': None, 'check_out_time': None}
print(f" 解析考勤字符串: {attendance_str}")
if '休息' in attendance_str:
result = self._parse_weekend_attendance(attendance_str)
print(f" 周末考勤结果: {result}")
return result
# 解析工作日考勤
records = []
parts = attendance_str.split(',')
time_periods = ['morning_in', 'morning_out', 'afternoon_in', 'afternoon_out', 'evening_in', 'evening_out']
# 解析各时段打卡记录
for i, part in enumerate(parts):
if i >= len(time_periods):
break
part = part.strip()
period = time_periods[i]
print(f" 处理时段 {period}: {part}")
# 🔥 关键修改:对晚上时段的特殊处理
if self._is_evening_period(period):
if '缺卡' in part:
# 晚上缺卡视为正常(可打可不打)
records.append({'period': period, 'status': 'evening_normal', 'time': None})
print(f" 晚上缺卡转为正常(可打可不打)")
elif '正常' in part:
time_match = re.search(r'\((\d{2}:\d{2})\)', part)
card_time = time_match.group(1) if time_match else None
records.append({'period': period, 'status': 'normal', 'time': card_time})
print(f" 晚上正常打卡时间: {card_time}")
elif '迟到' in part:
# 晚上迟到也视为正常
time_match = re.search(r'\((\d{2}:\d{2})\)', part)
late_match = re.search(r'迟到(\d+)分钟', part)
card_time = time_match.group(1) if time_match else None
late_minutes = int(late_match.group(1)) if late_match else 0
records.append({
'period': period,
'status': 'evening_late_normal', # 🔥 新状态:晚上迟到但视为正常
'time': card_time,
'late_minutes': late_minutes
})
print(f" 晚上迟到转为正常,打卡时间: {card_time}, 迟到分钟: {late_minutes}")
elif '早退' in part:
time_match = re.search(r'\((\d{2}:\d{2})\)', part)
early_match = re.search(r'早退(\d+)分钟', part)
card_time = time_match.group(1) if time_match else None
early_minutes = int(early_match.group(1)) if early_match else 0
records.append({
'period': period,
'status': 'evening_early_normal', # 🔥 新状态:晚上早退但视为正常
'time': card_time,
'early_minutes': early_minutes
})
print(f" 晚上早退转为正常,打卡时间: {card_time}, 早退分钟: {early_minutes}")
else:
# 早上和下午时段的正常处理
if '缺卡' in part:
records.append({'period': period, 'status': 'missing', 'time': None})
elif '正常' in part:
time_match = re.search(r'\((\d{2}:\d{2})\)', part)
card_time = time_match.group(1) if time_match else None
records.append({'period': period, 'status': 'normal', 'time': card_time})
print(f" 正常打卡时间: {card_time}")
elif '迟到' in part:
time_match = re.search(r'\((\d{2}:\d{2})\)', part)
late_match = re.search(r'迟到(\d+)分钟', part)
card_time = time_match.group(1) if time_match else None
late_minutes = int(late_match.group(1)) if late_match else 0
records.append({
'period': period,
'status': 'late',
'time': card_time,
'late_minutes': late_minutes
})
print(f" 迟到打卡时间: {card_time}, 迟到分钟: {late_minutes}")
elif '早退' in part:
time_match = re.search(r'\((\d{2}:\d{2})\)', part)
early_match = re.search(r'早退(\d+)分钟', part)
card_time = time_match.group(1) if time_match else None
early_minutes = int(early_match.group(1)) if early_match else 0
records.append({
'period': period,
'status': 'early_leave',
'time': card_time,
'early_minutes': early_minutes
})
print(f" 早退打卡时间: {card_time}, 早退分钟: {early_minutes}")
# 计算签到签退时间
check_in_time, check_out_time = self._calculate_check_times(records)
# 🔥 新增:检查是否全天缺卡(只考虑早上和下午)
has_valid_punch = any(record.get('status') in ['normal', 'late', 'early_leave'] and record.get('time')
and not self._is_evening_period(record.get('period', ''))
for record in records)
# 如果早上和下午没有任何有效打卡记录,标记为缺勤
if not has_valid_punch:
status = 'absent'
print(f" 检测到早上和下午无有效打卡,标记为缺勤")
else:
status = 'workday'
result = {
'status': status,
'records': records,
'check_in_time': check_in_time,
'check_out_time': check_out_time
}
print(f" 工作日考勤结果: {result}")
return result
def _calculate_check_times(self, records: List[Dict]) -> Tuple[Optional[str], Optional[str]]:
"""从打卡记录中计算签到和签退时间"""
check_in_time = None
check_out_time = None
# 查找最早的有效签到时间(包含晚上的正常处理)
for record in records:
if (record['period'].endswith('_in') and record['time'] and
record['status'] in ['normal', 'late', 'evening_late_normal']):
if not check_in_time or record['time'] < check_in_time:
check_in_time = record['time']
# 查找最晚的有效签退时间(包含晚上的正常处理)
for record in records:
if (record['period'].endswith('_out') and record['time'] and
record['status'] in ['normal', 'early_leave', 'evening_early_normal']):
if not check_out_time or record['time'] > check_out_time:
check_out_time = record['time']
print(f" 计算签到签退时间: 签到={check_in_time}, 签退={check_out_time}")
return check_in_time, check_out_time
def _parse_weekend_attendance(self, attendance_str: str) -> Dict:
"""解析周末考勤"""
if '休息(-,-)' in attendance_str:
return {
'status': 'weekend_rest',
'records': [],
'check_in_time': None,
'check_out_time': None
}
# 解析周末加班
time_match = re.search(r'休息打卡\((\d{2}:\d{2}),?(\d{2}:\d{2})?\)', attendance_str)
if time_match:
start_time = time_match.group(1)
end_time = time_match.group(2) if time_match.group(2) else None
return {
'status': 'weekend_work',
'records': [{'start': start_time, 'end': end_time}],
'check_in_time': start_time,
'check_out_time': end_time
}
return {
'status': 'weekend_rest',
'records': [],
'check_in_time': None,
'check_out_time': None
}
def calculate_weekly_statistics(self, daily_data: Dict, week_start: str, week_end: str) -> Dict:
"""计算周统计数据"""
stats = {
'actual_work_hours': 0.0,
'class_work_hours': 0.0,
'absent_days': 0,
'overtime_hours': 0.0
}
print(f"\n计算周统计数据,周期: {week_start}{week_end}")
start_date = datetime.strptime(week_start, '%Y-%m-%d')
end_date = datetime.strptime(week_end, '%Y-%m-%d')
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime('%Y-%m-%d')
is_weekday = current_date.weekday() < 5
print(f"处理日期 {date_str}, 是否工作日: {is_weekday}")
if date_str in daily_data:
day_data = daily_data[date_str]
print(f" 找到数据: {day_data}")
if day_data['status'] == 'workday':
# 🔥 修改:检查早上和下午是否有有效打卡记录
valid_records = [record for record in day_data['records']
if (record.get('status') in ['normal', 'late', 'early_leave']
and record.get('time')
and not self._is_evening_period(record.get('period', '')))]
if not valid_records and is_weekday:
# 虽然标记为工作日,但早上和下午没有有效打卡记录,算作缺勤
stats['absent_days'] += 1
print(f" 工作日早上下午无有效打卡记录,记为缺勤")
else:
# 有有效打卡记录,计算工时
day_stats = self._calculate_daily_hours(day_data['records'], is_weekday)
print(f" 计算得到工时: {day_stats}")
stats['actual_work_hours'] += day_stats['actual_hours']
if is_weekday:
stats['class_work_hours'] += day_stats['actual_hours']
else:
stats['overtime_hours'] += day_stats['actual_hours']
elif day_data['status'] == 'weekend_work':
overtime = self._calculate_weekend_overtime(day_data['records'])
print(f" 周末加班时长: {overtime}")
stats['actual_work_hours'] += overtime
stats['overtime_hours'] += overtime
elif day_data['status'] == 'absent' and is_weekday:
stats['absent_days'] += 1
print(f" 缺勤")
elif is_weekday:
stats['absent_days'] += 1
print(f" 工作日无数据,记为缺勤")
current_date += timedelta(days=1)
print(f"最终统计结果: {stats}")
return stats
def _calculate_daily_hours(self, records: List[Dict], is_weekday: bool) -> Dict:
"""计算每日工作时长"""
total_hours = 0.0
print(f" 计算每日工时,记录: {records}")
# 处理各时段
morning_in = None
morning_out = None
afternoon_in = None
afternoon_out = None
evening_in = None
evening_out = None
for record in records:
# 🔥 修改:处理晚上的特殊状态
valid_statuses = ['normal', 'late', 'early_leave', 'evening_late_normal', 'evening_early_normal']
if record['period'] == 'morning_in' and record['status'] in valid_statuses and record['time']:
morning_in = datetime.strptime(record['time'], '%H:%M').time()
print(f" 早上上班时间: {morning_in}")
elif record['period'] == 'morning_out' and record['status'] in valid_statuses and record['time']:
morning_out = datetime.strptime(record['time'], '%H:%M').time()
print(f" 早上下班时间: {morning_out}")
elif record['period'] == 'afternoon_in' and record['status'] in valid_statuses and record['time']:
afternoon_in = datetime.strptime(record['time'], '%H:%M').time()
print(f" 下午上班时间: {afternoon_in}")
elif record['period'] == 'afternoon_out' and record['status'] in valid_statuses and record['time']:
afternoon_out = datetime.strptime(record['time'], '%H:%M').time()
print(f" 下午下班时间: {afternoon_out}")
elif record['period'] == 'evening_in' and record['status'] in valid_statuses and record['time']:
evening_in = datetime.strptime(record['time'], '%H:%M').time()
print(f" 晚上上班时间: {evening_in}")
elif record['period'] == 'evening_out' and record['status'] in valid_statuses and record['time']:
evening_out = datetime.strptime(record['time'], '%H:%M').time()
print(f" 晚上下班时间: {evening_out}")
# 计算各时段工时
if morning_in and morning_out:
morning_hours = self._calculate_time_diff(morning_in, morning_out)
total_hours += morning_hours
print(f" 早上工时: {morning_hours}")
if afternoon_in and afternoon_out:
afternoon_hours = self._calculate_time_diff(afternoon_in, afternoon_out)
total_hours += afternoon_hours
print(f" 下午工时: {afternoon_hours}")
if evening_in and evening_out:
evening_hours = self._calculate_time_diff(evening_in, evening_out)
total_hours += evening_hours
print(f" 晚上工时: {evening_hours}")
print(f" 总工时: {total_hours}")
return {'actual_hours': total_hours}
def _calculate_weekend_overtime(self, records: List[Dict]) -> float:
"""计算周末加班时长"""
if not records or not records[0].get('start'):
return 0.0
start_time = datetime.strptime(records[0]['start'], '%H:%M').time()
end_time = None
if records[0].get('end'):
end_time = datetime.strptime(records[0]['end'], '%H:%M').time()
if start_time and end_time:
return self._calculate_time_diff(start_time, end_time)
return 0.0
def _calculate_time_diff(self, start_time: time, end_time: time) -> float:
"""计算时间差(小时)"""
start_minutes = start_time.hour * 60 + start_time.minute
end_minutes = end_time.hour * 60 + end_time.minute
if end_minutes < start_minutes: # 跨天
end_minutes += 24 * 60
diff_minutes = end_minutes - start_minutes
result = round(diff_minutes / 60.0, 1)
print(f" 时间差计算: {start_time}{end_time} = {diff_minutes}分钟 = {result}小时")
return result
def _get_file_date_range(self, data: Dict) -> Tuple[Optional[str], Optional[str]]:
"""🔥 新增:获取文件中的实际日期范围"""
all_dates = set()
for student_data in data.values():
all_dates.update(student_data.keys())
if not all_dates:
return None, None
sorted_dates = sorted(all_dates)
return sorted_dates[0], sorted_dates[-1]
def _check_data_conflicts(self, week_start: str, week_end: str) -> List[str]:
"""🔥 新增:第一重校验 - 检查时间冲突"""
print(f"\n🔍 第一重校验:检查时间冲突")
print(f"用户选择周期: {week_start}{week_end}")
week_start_date = datetime.strptime(week_start, '%Y-%m-%d').date()
week_end_date = datetime.strptime(week_end, '%Y-%m-%d').date()
# 查询所有可能冲突的记录
conflicting_records = WeeklyAttendance.query.filter(
# 检查是否有时间重叠
db.or_(
# 新周期开始日期在现有周期内
db.and_(
WeeklyAttendance.week_start_date <= week_start_date,
WeeklyAttendance.week_end_date >= week_start_date
),
# 新周期结束日期在现有周期内
db.and_(
WeeklyAttendance.week_start_date <= week_end_date,
WeeklyAttendance.week_end_date >= week_end_date
),
# 新周期完全包含现有周期
db.and_(
WeeklyAttendance.week_start_date >= week_start_date,
WeeklyAttendance.week_end_date <= week_end_date
)
)
).all()
if conflicting_records:
conflict_info = []
for record in conflicting_records:
conflict_info.append(
f"学生 {record.name}{record.week_start_date}{record.week_end_date} 已有考勤记录"
)
print(f"❌ 发现 {len(conflicting_records)} 个冲突记录")
for info in conflict_info[:5]: # 最多显示5个
print(f" - {info}")
return conflict_info
print(f"✅ 未发现时间冲突")
return []
def _check_file_coverage(self, data: Dict, week_start: str, week_end: str) -> bool:
"""🔥 新增:第二重校验 - 检查文件覆盖度"""
print(f"\n🔍 第二重校验:检查文件覆盖度")
file_start, file_end = self._get_file_date_range(data)
if not file_start or not file_end:
print(f"❌ 文件中没有任何日期数据")
return False
print(f"文件日期范围: {file_start}{file_end}")
print(f"用户选择周期: {week_start}{week_end}")
# 检查用户选择的周期是否在文件日期范围内
if week_start < file_start or week_end > file_end:
print(f"❌ 用户选择周期超出文件日期范围")
return False
print(f"✅ 文件覆盖度检查通过")
return True
def _check_data_completeness(self, data: Dict, week_start: str, week_end: str) -> bool:
"""🔥 新增:第三重校验 - 检查数据完整性"""
print(f"\n🔍 第三重校验:检查数据完整性")
start_date = datetime.strptime(week_start, '%Y-%m-%d')
end_date = datetime.strptime(week_end, '%Y-%m-%d')
total_expected_data_points = 0
total_valid_data_points = 0
# 遍历每个学生
for student_name, daily_data in data.items():
print(f" 检查学生 {student_name} 的数据完整性")
# 遍历选择周期内的每一天
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime('%Y-%m-%d')
total_expected_data_points += 1
if date_str in daily_data:
day_data = daily_data[date_str]
# 检查是否有有效数据(不是纯缺勤)
if day_data.get('status') != 'absent' or day_data.get('records'):
total_valid_data_points += 1
print(f" {date_str}: 有效数据")
else:
print(f" {date_str}: 缺勤/无效数据")
else:
print(f" {date_str}: 无数据")
current_date += timedelta(days=1)
print(f"总期望数据点: {total_expected_data_points}")
print(f"有效数据点: {total_valid_data_points}")
# 如果完全没有有效数据,认为是数据为空
if total_valid_data_points == 0:
print(f"❌ 选择周期内无任何有效打卡数据")
return False
print(f"✅ 数据完整性检查通过")
return True
def import_to_database(self, data: Dict, week_start: str, week_end: str):
"""🔥 优化后的导入数据到数据库方法 - 包含三重校验"""
print(f"\n🚀 开始带三重校验的数据库导入,共{len(data)}个学生")
try:
# 🔥 第一重校验:时间冲突检测
conflicts = self._check_data_conflicts(week_start, week_end)
if conflicts:
error_message = "有内容重复上传,请检查选择时间"
print(f"{error_message}")
return 0, 1, [error_message] + conflicts[:3] # 返回前3个冲突详情
# 🔥 第二重校验:文件覆盖度验证
if not self._check_file_coverage(data, week_start, week_end):
error_message = "上传打卡记录为空"
print(f"{error_message}")
return 0, 1, [error_message]
# 🔥 第三重校验:数据完整性检查
if not self._check_data_completeness(data, week_start, week_end):
error_message = "上传打卡记录为空"
print(f"{error_message}")
return 0, 1, [error_message]
print(f"✅ 所有校验通过,开始导入数据")
# 通过所有校验后,执行原有的导入逻辑
return self._execute_import(data, week_start, week_end)
except Exception as e:
logger.error(f"数据导入校验失败: {e}")
return 0, 1, [f"导入校验失败: {str(e)}"]
def _execute_import(self, data: Dict, week_start: str, week_end: str):
"""🔥 执行实际的数据导入(原有逻辑)"""
success_count = 0
error_count = 0
error_messages = []
print(f"\n💾 执行数据导入操作")
try:
# 🔥 步骤1批量查询所有相关的学生信息
print("📊 步骤1: 批量查询学生信息...")
all_student_names = list(data.keys())
students_dict = {s.name: s for s in Student.query.filter(Student.name.in_(all_student_names)).all()}
# 处理特殊学号的学生
special_student = None
for student in students_dict.values():
if student.student_number == self.special_student_number:
special_student = student
break
print(f"找到学生记录: {len(students_dict)}")
# 🔥 步骤2批量查询现有的周考勤记录这个查询在校验阶段已确保不会有冲突
print("📊 步骤2: 批量查询现有周考勤记录...")
week_start_date = datetime.strptime(week_start, '%Y-%m-%d').date()
week_end_date = datetime.strptime(week_end, '%Y-%m-%d').date()
existing_records_query = WeeklyAttendance.query.filter(
WeeklyAttendance.week_start_date == week_start_date,
WeeklyAttendance.week_end_date == week_end_date,
WeeklyAttendance.student_number.in_([s.student_number for s in students_dict.values()])
).all()
existing_records_dict = {r.student_number: r for r in existing_records_query}
print(f"找到现有考勤记录: {len(existing_records_dict)}")
# 🔥 步骤3批量删除现有的每日考勤明细
print("🗑️ 步骤3: 批量删除现有每日考勤明细...")
existing_record_ids = [r.record_id for r in existing_records_query]
if existing_record_ids:
deleted_count = DailyAttendanceDetail.query.filter(
DailyAttendanceDetail.weekly_record_id.in_(existing_record_ids)
).delete(synchronize_session=False)
print(f"删除现有每日明细: {deleted_count}")
# 🔥 步骤4准备批量数据
print("📋 步骤4: 准备批量数据...")
weekly_records_to_add = []
daily_details_data = []
for name, daily_data in data.items():
try:
print(f"\n处理学生: {name}")
# 获取学生信息
student = students_dict.get(name)
if not student:
error_messages.append(f"未找到学生: {name}")
error_count += 1
print(f" 未找到学生记录")
continue
print(f" 找到学生: {student.student_number}")
# 🔥 特殊学生处理:应用特殊规则
if special_student and student.student_number == self.special_student_number:
print(f" 应用特殊学号处理规则")
daily_data = self._apply_special_student_rules(daily_data, week_start, week_end)
# 计算周统计
weekly_stats = self.calculate_weekly_statistics(daily_data, week_start, week_end)
# 准备周记录数据
weekly_record_data = {
'student_number': student.student_number,
'name': name,
'week_start_date': week_start_date,
'week_end_date': week_end_date,
'actual_work_hours': weekly_stats['actual_work_hours'],
'class_work_hours': weekly_stats['class_work_hours'],
'absent_days': weekly_stats['absent_days'],
'overtime_hours': weekly_stats['overtime_hours'],
'updated_at': datetime.now()
}
# 检查是否已存在记录
existing_record = existing_records_dict.get(student.student_number)
if existing_record:
print(f" 更新现有记录")
# 更新现有记录
for key, value in weekly_record_data.items():
if key not in ['student_number', 'name', 'week_start_date', 'week_end_date']:
setattr(existing_record, key, value)
weekly_record_id = existing_record.record_id
else:
print(f" 准备创建新记录")
# 准备新记录数据
weekly_record_data['created_at'] = datetime.now()
weekly_records_to_add.append(weekly_record_data)
weekly_record_id = None # 将在插入后获取
# 准备每日考勤明细数据
daily_details_batch = self._prepare_daily_details_batch(
weekly_record_id, student.student_number, daily_data,
week_start, week_end, existing_record is not None
)
daily_details_data.extend(daily_details_batch)
success_count += 1
except Exception as e:
error_messages.append(f"处理学生 {name} 时出错: {str(e)}")
error_count += 1
print(f" 处理失败: {e}")
continue
# 🔥 步骤5批量插入新的周记录
print(f"💾 步骤5: 批量插入周记录,新增 {len(weekly_records_to_add)}")
if weekly_records_to_add:
db.session.execute(
WeeklyAttendance.__table__.insert(),
weekly_records_to_add
)
db.session.flush()
# 获取新插入记录的ID更新每日明细的weekly_record_id
new_records = WeeklyAttendance.query.filter(
WeeklyAttendance.week_start_date == week_start_date,
WeeklyAttendance.week_end_date == week_end_date,
WeeklyAttendance.student_number.in_([r['student_number'] for r in weekly_records_to_add])
).all()
new_records_dict = {r.student_number: r.record_id for r in new_records}
# 更新每日明细中的weekly_record_id
for detail in daily_details_data:
if detail['weekly_record_id'] is None:
detail['weekly_record_id'] = new_records_dict.get(detail['student_number'])
# 🔥 步骤6批量插入每日考勤明细
print(f"💾 步骤6: 批量插入每日明细,共 {len(daily_details_data)}")
if daily_details_data:
# 清理None的weekly_record_id
valid_daily_details = [d for d in daily_details_data if d.get('weekly_record_id') is not None]
print(f"有效每日明细记录: {len(valid_daily_details)}")
if valid_daily_details:
db.session.bulk_insert_mappings(DailyAttendanceDetail, valid_daily_details)
# 🔥 步骤7提交事务
print("✅ 步骤7: 提交事务")
db.session.commit()
logger.info(f"优化版数据导入完成: 成功 {success_count} 条,失败 {error_count}")
print(f"🎉 导入完成: 成功 {success_count} 条,失败 {error_count}")
except Exception as e:
db.session.rollback()
logger.error(f"数据导入失败: {e}")
raise
return success_count, error_count, error_messages
def _apply_special_student_rules(self, daily_data: Dict, week_start: str, week_end: str) -> Dict:
"""应用特殊学生的考勤规则"""
print(f" 🔧 应用特殊学号考勤规则")
fixed_data = {}
week_start_date = datetime.strptime(week_start, '%Y-%m-%d').date()
week_end_date = datetime.strptime(week_end, '%Y-%m-%d').date()
# 遍历周期内的所有日期
current_date = week_start_date
while current_date <= week_end_date:
date_str = current_date.strftime('%Y-%m-%d')
is_weekday = current_date.weekday() < 5 # 0-4是工作日
if not is_weekday:
# 非工作日保持原样
if date_str in daily_data:
fixed_data[date_str] = daily_data[date_str]
current_date += timedelta(days=1)
continue
print(f" 处理工作日 {date_str}")
# 为工作日创建完整的打卡记录
day_data = daily_data.get(date_str, {'status': 'absent', 'records': []})
# 🔥 新逻辑:分析原始记录,构建各时段的记录映射
original_records = {}
evening_records = []
if day_data.get('records'):
for record in day_data['records']:
period = record.get('period')
if period and period.startswith('evening_'):
# 晚上记录单独保存
evening_records.append(record)
elif period:
# 早上和下午记录保存到映射中
original_records[period] = record
print(f" 原始早上下午记录: {list(original_records.keys())}")
print(f" 原始晚上记录数量: {len(evening_records)}")
# 🔥 智能处理早上和下午的打卡记录
fixed_records = []
periods = ['morning_in', 'morning_out', 'afternoon_in', 'afternoon_out']
for period in periods:
original_record = original_records.get(period)
if original_record:
# 有原始记录,检查状态
original_status = original_record.get('status')
original_time = original_record.get('time')
if original_status == 'normal' and original_time:
# 🔥 正常打卡保持不变
print(f" {period}: 保持原有正常打卡 ({original_time})")
fixed_records.append({
'period': period,
'status': 'normal',
'time': original_time
})
else:
# 🔥 有问题的打卡(迟到、缺卡等)才修正
new_time = self._generate_normal_punch_time(period)
print(f" {period}: 修正问题打卡 {original_status} -> normal ({new_time})")
fixed_records.append({
'period': period,
'status': 'normal',
'time': new_time
})
else:
# 🔥 没有原始记录,生成正常打卡
new_time = self._generate_normal_punch_time(period)
print(f" {period}: 生成缺失打卡 -> normal ({new_time})")
fixed_records.append({
'period': period,
'status': 'normal',
'time': new_time
})
# 添加晚上记录(保持原样)
fixed_records.extend(evening_records)
print(f" 保留晚上记录: {len(evening_records)}")
# 如果没有晚上记录,添加默认的晚上记录
has_evening_in = any(r['period'] == 'evening_in' for r in evening_records)
has_evening_out = any(r['period'] == 'evening_out' for r in evening_records)
if not has_evening_in:
fixed_records.append({
'period': 'evening_in',
'status': 'evening_normal',
'time': None
})
print(f" evening_in: 添加默认晚上记录")
if not has_evening_out:
fixed_records.append({
'period': 'evening_out',
'status': 'evening_normal',
'time': None
})
print(f" evening_out: 添加默认晚上记录")
# 重新计算签到签退时间
check_in_time, check_out_time = self._calculate_check_times(fixed_records)
fixed_data[date_str] = {
'status': 'workday',
'records': fixed_records,
'check_in_time': check_in_time,
'check_out_time': check_out_time
}
print(f" 最终状态: workday, 签到: {check_in_time}, 签退: {check_out_time}")
current_date += timedelta(days=1)
return fixed_data
def _prepare_daily_details_batch(self, weekly_record_id: Optional[int], student_number: str,
daily_data: Dict, week_start: str, week_end: str,
is_update: bool) -> List[Dict]:
"""准备每日考勤明细的批量插入数据"""
daily_details_batch = []
start_date = datetime.strptime(week_start, '%Y-%m-%d')
end_date = datetime.strptime(week_end, '%Y-%m-%d')
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime('%Y-%m-%d')
status = '缺勤'
remarks = '无数据'
check_in_time = None
check_out_time = None
if date_str in daily_data:
day_data = daily_data[date_str]
status = self._get_daily_status(day_data)
remarks = self._generate_remarks(day_data)
# 提取签到签退时间
if day_data.get('check_in_time'):
try:
check_in_time = datetime.strptime(day_data['check_in_time'], '%H:%M').time()
except:
check_in_time = None
if day_data.get('check_out_time'):
try:
check_out_time = datetime.strptime(day_data['check_out_time'], '%H:%M').time()
except:
check_out_time = None
# 生成详细的时段记录
detailed_records = self._generate_detailed_records(day_data)
if detailed_records:
import json
remarks = json.dumps({
'summary': remarks,
'details': detailed_records
}, ensure_ascii=False)
daily_detail_data = {
'weekly_record_id': weekly_record_id, # 可能为None后面会更新
'student_number': student_number,
'attendance_date': current_date.date(),
'status': status,
'check_in_time': check_in_time,
'check_out_time': check_out_time,
'remarks': remarks,
'created_at': datetime.now()
}
daily_details_batch.append(daily_detail_data)
current_date += timedelta(days=1)
return daily_details_batch
def _generate_detailed_records(self, day_data: Dict) -> Dict:
"""生成详细的时段打卡记录"""
if day_data['status'] in ['weekend_rest', 'absent']:
return None
detailed = {
'morning': {'in': None, 'out': None, 'status': 'missing'},
'afternoon': {'in': None, 'out': None, 'status': 'missing'},
'evening': {'in': None, 'out': None, 'status': 'missing'}
}
if day_data['status'] == 'weekend_work':
# 处理周末加班
if day_data['records']:
record = day_data['records'][0]
detailed['overtime'] = {
'in': record.get('start'),
'out': record.get('end'),
'status': 'overtime'
}
return detailed
# 处理工作日打卡
for record in day_data['records']:
period = record['period']
time_str = record.get('time')
status = record.get('status', 'missing')
# 🔥 修改:晚上的特殊状态转换为正常状态显示
if status in ['evening_normal', 'evening_late_normal', 'evening_early_normal']:
display_status = 'normal' # 在详细记录中显示为正常
else:
display_status = status
if period == 'morning_in':
detailed['morning']['in'] = time_str
detailed['morning']['status'] = display_status
# 只有当状态确实是late时才记录迟到分钟数
if status == 'late' and 'late_minutes' in record:
detailed['morning']['late_minutes'] = record.get('late_minutes', 0)
elif period == 'morning_out':
detailed['morning']['out'] = time_str
# 只有当状态确实是early_leave时才记录早退分钟数
if status == 'early_leave' and 'early_minutes' in record:
detailed['morning']['early_minutes'] = record.get('early_minutes', 0)
elif period == 'afternoon_in':
detailed['afternoon']['in'] = time_str
detailed['afternoon']['status'] = display_status
# 只有当状态确实是late时才记录迟到分钟数
if status == 'late' and 'late_minutes' in record:
detailed['afternoon']['late_minutes'] = record.get('late_minutes', 0)
elif period == 'afternoon_out':
detailed['afternoon']['out'] = time_str
# 只有当状态确实是early_leave时才记录早退分钟数
if status == 'early_leave' and 'early_minutes' in record:
detailed['afternoon']['early_minutes'] = record.get('early_minutes', 0)
elif period == 'evening_in':
detailed['evening']['in'] = time_str
detailed['evening']['status'] = display_status
# 🔥 晚上迟到分钟数仍然记录,但不影响整体状态
if status == 'evening_late_normal' and 'late_minutes' in record:
detailed['evening']['late_minutes'] = record.get('late_minutes', 0)
elif period == 'evening_out':
detailed['evening']['out'] = time_str
# 🔥 晚上早退分钟数仍然记录,但不影响整体状态
if status == 'evening_early_normal' and 'early_minutes' in record:
detailed['evening']['early_minutes'] = record.get('early_minutes', 0)
return detailed
def _get_daily_status(self, day_data: Dict) -> str:
"""获取每日状态 - 重写版本,精确判断缺勤逻辑"""
if day_data['status'] == 'absent':
return '缺勤'
elif day_data['status'] == 'leave':
return '请假'
elif day_data['status'] == 'leave_with_punch': # 新增:有打卡的请假
return '请假'
elif day_data['status'] == 'weekend_rest':
return '休息'
elif day_data['status'] == 'weekend_work':
return '加班'
else:
# 🔥 新逻辑:检查早上和下午的打卡完整性
# 分析早上打卡情况
morning_in_record = None
morning_out_record = None
afternoon_in_record = None
afternoon_out_record = None
for record in day_data['records']:
period = record.get('period', '')
if not self._is_evening_period(period): # 忽略晚上时段
if period == 'morning_in':
morning_in_record = record
elif period == 'morning_out':
morning_out_record = record
elif period == 'afternoon_in':
afternoon_in_record = record
elif period == 'afternoon_out':
afternoon_out_record = record
# 🔥 检查早上打卡完整性
morning_complete = (
morning_in_record and
morning_in_record.get('status') in ['normal', 'late', 'early_leave'] and
morning_in_record.get('time') and
morning_out_record and
morning_out_record.get('status') in ['normal', 'late', 'early_leave'] and
morning_out_record.get('time')
)
# 🔥 检查下午打卡完整性
afternoon_complete = (
afternoon_in_record and
afternoon_in_record.get('status') in ['normal', 'late', 'early_leave'] and
afternoon_in_record.get('time') and
afternoon_out_record and
afternoon_out_record.get('status') in ['normal', 'late', 'early_leave'] and
afternoon_out_record.get('time')
)
print(f" 状态分析: 早上完整={morning_complete}, 下午完整={afternoon_complete}")
# 🔥 关键修改:如果早上或下午任一时段不完整,就是缺勤
if not morning_complete and not afternoon_complete:
print(f" → 早上下午都不完整 → 缺勤")
return '缺勤'
elif not morning_complete:
print(f" → 早上不完整 → 缺勤")
return '缺勤'
elif not afternoon_complete:
print(f" → 下午不完整 → 缺勤")
return '缺勤'
# 🔥 早上和下午都完整,检查是否有迟到
has_late = False
for record in day_data['records']:
if (record.get('status') == 'late'
and not self._is_evening_period(record.get('period', ''))):
has_late = True
print(f" → 发现早上/下午迟到 → 迟到")
break
if has_late:
return '迟到'
print(f" → 早上下午都完整且正常 → 正常")
return '正常'
def _generate_remarks(self, day_data: Dict) -> str:
"""生成备注信息 - 调整版本,与新状态逻辑保持一致"""
if day_data['status'] == 'absent':
return '缺勤'
elif day_data['status'] in ['leave', 'leave_with_punch']:
reason = day_data.get('leave_reason', '请假')
if day_data['status'] == 'leave_with_punch':
return f'请假({reason}) - 有打卡记录'
else:
return f'请假({reason})'
elif day_data['status'] == 'weekend_rest':
return '休息日'
elif day_data['status'] == 'weekend_work':
return '周末加班'
# 🔥 修改:更精确的备注生成逻辑
remarks = []
# 分析早上和下午的打卡情况
morning_issues = []
afternoon_issues = []
for record in day_data['records']:
period = record.get('period', '')
status = record.get('status', '')
# 跳过晚上时段的问题记录
if self._is_evening_period(period):
continue
# 按时段分类记录问题
if period.startswith('morning_'):
if status == 'late':
morning_issues.append(f"迟到{record.get('late_minutes', 0)}分钟")
elif status == 'early_leave':
morning_issues.append(f"早退{record.get('early_minutes', 0)}分钟")
elif status == 'missing':
morning_issues.append("缺卡")
elif period.startswith('afternoon_'):
if status == 'late':
afternoon_issues.append(f"迟到{record.get('late_minutes', 0)}分钟")
elif status == 'early_leave':
afternoon_issues.append(f"早退{record.get('early_minutes', 0)}分钟")
elif status == 'missing':
afternoon_issues.append("缺卡")
# 🔥 检查是否有完整时段缺勤
morning_in_record = None
morning_out_record = None
afternoon_in_record = None
afternoon_out_record = None
for record in day_data['records']:
period = record.get('period', '')
if period == 'morning_in':
morning_in_record = record
elif period == 'morning_out':
morning_out_record = record
elif period == 'afternoon_in':
afternoon_in_record = record
elif period == 'afternoon_out':
afternoon_out_record = record
# 检查早上时段缺勤
morning_complete = (
morning_in_record and morning_in_record.get('time') and
morning_out_record and morning_out_record.get('time')
)
# 检查下午时段缺勤
afternoon_complete = (
afternoon_in_record and afternoon_in_record.get('time') and
afternoon_out_record and afternoon_out_record.get('time')
)
if not morning_complete:
remarks.append("早上缺勤")
elif morning_issues:
remarks.extend([f"早上{issue}" for issue in morning_issues])
if not afternoon_complete:
remarks.append("下午缺勤")
elif afternoon_issues:
remarks.extend([f"下午{issue}" for issue in afternoon_issues])
return '; '.join(remarks) if remarks else '正常'
def add_name_mapping(self, feishu_name: str, real_name: str):
"""添加新的飞书用户名映射"""
self.feishu_name_mapping[feishu_name] = real_name
logger.info(f"添加用户名映射: {feishu_name} -> {real_name}")
def get_name_mappings(self) -> Dict[str, str]:
"""获取当前的用户名映射表"""
return self.feishu_name_mapping.copy()
# ============== 以下是新增的请假处理方法 ==============
def parse_leave_file(self, file_path: str) -> List[Dict]:
"""解析请假单文件"""
try:
# 读取Excel文件
df = pd.read_excel(file_path)
logger.info(f"成功读取请假单文件: {file_path}")
print("=" * 50)
print("请假单文件列名:")
for i, col in enumerate(df.columns):
print(f"{i}列: {col}")
print("=" * 50)
print("前5行数据") # 增加显示行数
print(df.head(5))
print("=" * 50)
leave_records = []
# 查找相关列
name_col = None
reason_col = None
start_col = None
end_col = None
for col in df.columns:
col_str = str(col)
if '请假人员' in col_str or '姓名' in col_str:
name_col = col
elif '请假事由' in col_str or '事由' in col_str:
reason_col = col
elif '请假开始时间' in col_str or '开始时间' in col_str:
start_col = col
elif '请假结束时间' in col_str or '结束时间' in col_str:
end_col = col
print(f"识别到的列:姓名={name_col}, 事由={reason_col}, 开始时间={start_col}, 结束时间={end_col}")
if not all([name_col, start_col, end_col]):
raise ValueError("请假单文件缺少必要的列:请假人员、请假开始时间、请假结束时间")
# 处理每行数据
for index, row in df.iterrows():
try:
# 🔥 改进姓名处理逻辑
raw_name = row[name_col]
print(f"\n处理请假记录 {index + 1}:")
print(f" 原始姓名: '{raw_name}' (类型: {type(raw_name)})")
# 跳过空行或标题行
if pd.isna(raw_name) or str(raw_name).strip() == '' or str(raw_name).strip() == 'nan':
print(f" 跳过空姓名")
continue
name = self._normalize_student_name(str(raw_name).strip())
if not name:
print(f" 姓名标准化后为空,跳过")
continue
reason = str(row[reason_col]).strip() if reason_col and pd.notna(row[reason_col]) else "请假"
start_time_raw = row[start_col]
end_time_raw = row[end_col]
print(f" 标准化姓名: '{name}'")
print(f" 事由: '{reason}'")
print(f" 开始时间原始值: {start_time_raw} (类型: {type(start_time_raw)})")
print(f" 结束时间原始值: {end_time_raw} (类型: {type(end_time_raw)})")
# 转换时间格式
start_date = self._convert_excel_date(start_time_raw)
end_date = self._convert_excel_date(end_time_raw)
if start_date and end_date:
leave_record = {
'name': name,
'reason': reason,
'start_date': start_date,
'end_date': end_date,
'raw_start': start_time_raw,
'raw_end': end_time_raw
}
leave_records.append(leave_record)
print(f" ✅ 成功添加请假记录: {start_date}{end_date}")
else:
print(f" ❌ 时间转换失败,跳过此记录")
except Exception as e:
print(f" ❌ 处理第 {index + 1} 行时出错: {e}")
continue
print(f"\n📊 成功解析请假记录 {len(leave_records)}")
for i, record in enumerate(leave_records, 1):
print(f" {i}. {record['name']}: {record['start_date']}{record['end_date']} ({record['reason']})")
return leave_records
except Exception as e:
logger.error(f"解析请假单文件失败: {e}")
raise
def _convert_excel_date(self, date_value) -> Optional[str]:
"""转换Excel中的日期值为标准日期格式"""
if pd.isna(date_value):
return None
try:
print(f" 转换日期: {date_value} (类型: {type(date_value)})")
# 如果是数字Excel日期序列号
if isinstance(date_value, (int, float)):
# Excel日期起始点是1900-01-01但需要处理Excel的闰年错误
if date_value > 59: # 1900-03-01之后
date_value -= 1
# 转换为日期
excel_date = datetime(1900, 1, 1) + timedelta(days=date_value - 1)
result = excel_date.strftime('%Y-%m-%d')
print(f" 数字转换结果: {result}")
return result
# 如果是字符串
elif isinstance(date_value, str):
date_value = date_value.strip()
# 尝试解析各种日期格式
date_formats = [
'%Y-%m-%d',
'%Y/%m/%d',
'%m/%d/%Y',
'%d/%m/%Y',
'%Y-%m-%d %H:%M:%S',
'%Y/%m/%d %H:%M:%S'
]
for fmt in date_formats:
try:
parsed_date = datetime.strptime(date_value, fmt)
result = parsed_date.strftime('%Y-%m-%d')
print(f" 字符串转换结果: {result}")
return result
except ValueError:
continue
# 如果都不匹配尝试pandas的日期解析
try:
parsed_date = pd.to_datetime(date_value)
result = parsed_date.strftime('%Y-%m-%d')
print(f" pandas转换结果: {result}")
return result
except:
pass
# 如果是datetime对象
elif isinstance(date_value, datetime):
result = date_value.strftime('%Y-%m-%d')
print(f" datetime转换结果: {result}")
return result
# 如果是pandas的Timestamp
elif hasattr(date_value, 'strftime'):
result = date_value.strftime('%Y-%m-%d')
print(f" timestamp转换结果: {result}")
return result
except Exception as e:
print(f" ❌ 日期转换失败: {date_value} -> {e}")
return None
def apply_leave_records(self, attendance_data: Dict, leave_records: List[Dict],
week_start: str, week_end: str) -> Dict:
"""将请假记录应用到考勤数据中"""
print(f"\n🔄 开始应用请假记录到考勤数据")
print(f"考勤数据覆盖周期: {week_start}{week_end}")
print(f"请假记录数量: {len(leave_records)}")
week_start_date = datetime.strptime(week_start, '%Y-%m-%d').date()
week_end_date = datetime.strptime(week_end, '%Y-%m-%d').date()
# 遍历每个学生的考勤数据
for student_name, daily_data in attendance_data.items():
print(f"\n👤 处理学生: {student_name}")
# 查找该学生的请假记录
student_leaves = [leave for leave in leave_records if leave['name'] == student_name]
if not student_leaves:
print(f" 无请假记录")
continue
print(f" 📋 找到请假记录 {len(student_leaves)}")
# 处理每条请假记录
for leave in student_leaves:
leave_start = datetime.strptime(leave['start_date'], '%Y-%m-%d').date()
leave_end = datetime.strptime(leave['end_date'], '%Y-%m-%d').date()
print(f" 📝 处理请假: {leave['start_date']}{leave['end_date']} ({leave['reason']})")
# 遍历请假期间的每一天
current_date = leave_start
while current_date <= leave_end:
date_str = current_date.strftime('%Y-%m-%d')
# 只处理在考勤周期内的日期
if week_start_date <= current_date <= week_end_date:
print(f" 📅 处理日期: {date_str}")
if date_str in daily_data:
day_data = daily_data[date_str]
original_status = day_data.get('status')
print(f" 原状态: {original_status}")
# 🔥 修改:优先设置为请假状态,即使有打卡记录
if day_data.get('status') in ['absent', 'workday']:
# 检查早上和下午是否有有效的打卡记录
has_valid_punch = any(
record.get('status') in ['normal', 'late', 'early_leave']
and record.get('time')
and not self._is_evening_period(record.get('period', ''))
for record in day_data.get('records', [])
)
if has_valid_punch:
# 有打卡记录的情况下,仍然设置为请假,但保留打卡信息
day_data['status'] = 'leave_with_punch' # 新状态:请假但有打卡
day_data['leave_reason'] = leave['reason']
print(f" 🎯 转换为请假(有打卡): {leave['reason']}")
else:
# 无打卡记录,设置为纯请假
day_data['status'] = 'leave'
day_data['leave_reason'] = leave['reason']
print(f" 🎯 转换为请假: {leave['reason']}")
else:
print(f" 非工作日或其他状态,不处理")
else:
# 如果该日期没有考勤记录,创建请假记录
daily_data[date_str] = {
'status': 'leave',
'leave_reason': leave['reason'],
'records': [],
'check_in_time': None,
'check_out_time': None
}
print(f" 创建请假记录")
else:
print(f" ⏭️ 日期 {date_str} 不在考勤周期内,跳过")
current_date += timedelta(days=1)
print(f"\n✅ 请假记录应用完成")
return attendance_data
def import_leave_records_to_database(self, leave_records: List[Dict]) -> int:
"""将请假记录导入到数据库"""
from app.models import LeaveRecord
success_count = 0
try:
for leave in leave_records:
try:
# 查找学生
student = Student.query.filter_by(name=leave['name']).first()
if not student:
print(f"未找到学生: {leave['name']}")
continue
# 检查是否已存在相同的请假记录
existing_leave = LeaveRecord.query.filter_by(
student_number=student.student_number,
leave_start_date=datetime.strptime(leave['start_date'], '%Y-%m-%d').date(),
leave_end_date=datetime.strptime(leave['end_date'], '%Y-%m-%d').date()
).first()
if existing_leave:
# 更新现有记录
existing_leave.leave_reason = leave['reason']
existing_leave.status = '已批准' # 假设上传的请假单都是已批准的
print(f"更新请假记录: {leave['name']}")
else:
# 创建新记录
leave_record = LeaveRecord(
student_number=student.student_number,
leave_start_date=datetime.strptime(leave['start_date'], '%Y-%m-%d').date(),
leave_end_date=datetime.strptime(leave['end_date'], '%Y-%m-%d').date(),
leave_reason=leave['reason'],
status='已批准' # 假设上传的请假单都是已批准的
)
db.session.add(leave_record)
print(f"创建请假记录: {leave['name']}")
success_count += 1
except Exception as e:
print(f"处理请假记录失败 {leave['name']}: {e}")
continue
db.session.commit()
print(f"请假记录导入完成: {success_count}")
except Exception as e:
db.session.rollback()
logger.error(f"请假记录导入失败: {e}")
raise
return success_count