superlishunqin 77b57a9876 0914
2025-09-14 01:28:47 +08:00

1843 lines
74 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, send_file
from flask_login import login_required, current_user
from app.models import db, User, Student, WeeklyAttendance, DailyAttendanceDetail, LeaveRecord
from app.utils.auth_helpers import admin_required
from app.utils.database import safe_add_and_commit, safe_commit, safe_delete_and_commit
from datetime import datetime, timedelta
from sqlalchemy import and_, or_, desc, func
import pandas as pd
import io
import re
from werkzeug.security import generate_password_hash
from app.utils.attendance_importer import AttendanceDataImporter
from werkzeug.utils import secure_filename
import os
import tempfile
admin_bp = Blueprint('admin', __name__)
@admin_bp.route('/dashboard')
@admin_required
def dashboard():
"""管理员主页"""
# 统计数据
total_students = Student.query.count()
total_attendance_records = WeeklyAttendance.query.count()
pending_leaves = LeaveRecord.query.filter_by(status='待审批').count()
# 最近一周的考勤统计
week_ago = datetime.now().date() - timedelta(days=7)
recent_records = WeeklyAttendance.query.filter(
WeeklyAttendance.week_start_date >= week_ago
).count()
# 按学院统计学生数量
college_stats = db.session.query(
Student.college,
func.count(Student.student_id).label('count')
).group_by(Student.college).all()
# 按导师统计学生数量
supervisor_stats = db.session.query(
Student.supervisor,
func.count(Student.student_id).label('count')
).group_by(Student.supervisor).order_by(desc('count')).limit(10).all()
# 最近的请假申请
recent_leaves = LeaveRecord.query.filter_by(
status='待审批'
).order_by(desc(LeaveRecord.created_at)).limit(5).all()
return render_template('admin/dashboard.html',
total_students=total_students,
total_attendance_records=total_attendance_records,
pending_leaves=pending_leaves,
recent_records=recent_records,
college_stats=college_stats,
supervisor_stats=supervisor_stats,
recent_leaves=recent_leaves)
@admin_bp.route('/students')
@admin_required
def student_list():
"""学生列表"""
page = request.args.get('page', 1, type=int)
per_page = 20
# 搜索和筛选
search = request.args.get('search', '').strip()
college = request.args.get('college', '').strip()
supervisor = request.args.get('supervisor', '').strip()
grade = request.args.get('grade', '', type=str)
query = Student.query
if search:
query = query.filter(or_(
Student.name.contains(search),
Student.student_number.contains(search)
))
if college:
query = query.filter(Student.college == college)
if supervisor:
query = query.filter(Student.supervisor == supervisor)
if grade:
try:
grade_int = int(grade)
query = query.filter(Student.grade == grade_int)
except ValueError:
pass
pagination = query.order_by(Student.student_number).paginate(
page=page, per_page=per_page, error_out=False
)
students = pagination.items
# 获取筛选选项
colleges = db.session.query(Student.college).distinct().all()
colleges = [c[0] for c in colleges if c[0]]
supervisors = db.session.query(Student.supervisor).distinct().all()
supervisors = [s[0] for s in supervisors if s[0]]
grades = db.session.query(Student.grade).distinct().all()
grades = sorted([g[0] for g in grades if g[0]])
return render_template('admin/student_list.html',
students=students,
pagination=pagination,
colleges=colleges,
supervisors=supervisors,
grades=grades,
search=search,
selected_college=college,
selected_supervisor=supervisor,
selected_grade=grade)
@admin_bp.route('/students/<student_number>')
@admin_required
def student_detail(student_number):
"""学生详细信息"""
import json
student = Student.query.filter_by(student_number=student_number).first_or_404()
# 获取考勤记录
attendance_records = WeeklyAttendance.query.filter_by(
student_number=student_number
).order_by(desc(WeeklyAttendance.week_start_date)).limit(10).all()
# 🔥 新增:为每个考勤记录计算真实的缺勤次数和迟到次数
def calculate_record_counts(record):
"""为单个考勤记录计算真实的缺勤次数和迟到次数"""
# 获取该记录的每日明细
daily_details = DailyAttendanceDetail.query.filter_by(
weekly_record_id=record.record_id
).all()
late_count = 0
absent_count = 0
for detail in daily_details:
# 🔥 处理完全缺勤的情况
if detail.status == '缺勤' and (not detail.remarks or not detail.remarks.startswith('{')):
# 完全缺勤的天数,早上+下午都缺勤
absent_count += 2 # 早上缺勤1次 + 下午缺勤1次
continue
if detail.remarks and detail.remarks.startswith('{'):
try:
remarks_data = json.loads(detail.remarks)
details_info = remarks_data.get('details', {})
# 统计迟到次数
for period in ['morning', 'afternoon', 'evening']:
period_data = details_info.get(period, {})
if period_data.get('status') == 'late':
late_count += 1
# 统计缺勤次数
# 检查早上缺勤morning_in AND morning_out 都missing
morning_data = details_info.get('morning', {})
morning_in_time = morning_data.get('in')
morning_out_time = morning_data.get('out')
morning_in_status = morning_data.get('status', 'missing')
if ((not morning_in_time or morning_in_status == 'missing') and
(not morning_out_time)):
absent_count += 1
# 检查下午缺勤afternoon_in AND afternoon_out 都missing
afternoon_data = details_info.get('afternoon', {})
afternoon_in_time = afternoon_data.get('in')
afternoon_out_time = afternoon_data.get('out')
afternoon_in_status = afternoon_data.get('status', 'missing')
if ((not afternoon_in_time or afternoon_in_status == 'missing') and
(not afternoon_out_time)):
absent_count += 1
except (json.JSONDecodeError, KeyError, AttributeError):
continue
return late_count, absent_count
# 🔥 为每个考勤记录添加计算属性
total_absent_count = 0
total_late_count = 0
for record in attendance_records:
late_count, absent_count = calculate_record_counts(record)
record.late_count = late_count
record.absent_count = absent_count
total_absent_count += absent_count
total_late_count += late_count
# 获取请假记录
leave_records = LeaveRecord.query.filter_by(
student_number=student_number
).order_by(desc(LeaveRecord.created_at)).limit(10).all()
# 统计数据
total_work_hours = db.session.query(
func.sum(WeeklyAttendance.actual_work_hours)
).filter_by(student_number=student_number).scalar() or 0
# 🔥 修改使用计算的缺勤次数而不是数据库的absent_days
print(f"学生 {student.name} - 总缺勤次数: {total_absent_count}, 总迟到次数: {total_late_count}")
return render_template('admin/student_detail.html',
student=student,
attendance_records=attendance_records,
leave_records=leave_records,
total_work_hours=float(total_work_hours),
total_absent_count=int(total_absent_count), # 🔥 新字段
total_late_count=int(total_late_count)) # 🔥 新字段
@admin_bp.route('/attendance')
@admin_required
def attendance_management():
"""考勤管理"""
# ========== 导出功能处理 ==========
if request.args.get('export') == 'excel':
try:
return export_attendance_data()
except Exception as e:
flash(f'导出失败: {str(e)}', 'error')
# 移除export参数重定向到正常页面
args = dict(request.args)
args.pop('export', None)
return redirect(url_for('admin.attendance_management', **args))
# ==================================
from sqlalchemy import desc, func, case, or_
import json
page = request.args.get('page', 1, type=int)
per_page = 50
# 筛选条件
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
student_search = request.args.get('student_search', '').strip()
sort_by = request.args.get('sort_by', 'week_start_date_desc') # 默认按周开始日期降序
# 🔥 新增:高级搜索参数
supervisors = request.args.getlist('supervisor') # 多选导师
grades = request.args.getlist('grade') # 多选年级
print(f"收到排序参数: {sort_by}") # 调试信息
# 🔥 修改构建基础查询不再在SQL中计算迟到次数
query = WeeklyAttendance.query
# 应用筛选条件
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
query = query.filter(WeeklyAttendance.week_start_date >= start_date_obj)
except ValueError:
flash('开始日期格式错误', 'error')
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
query = query.filter(WeeklyAttendance.week_end_date <= end_date_obj)
except ValueError:
flash('结束日期格式错误', 'error')
if student_search:
query = query.filter(or_(
WeeklyAttendance.name.contains(student_search),
WeeklyAttendance.student_number.contains(student_search)
))
# 🔥 新增:应用高级搜索筛选条件
if supervisors or grades:
# 需要连接Student表来筛选导师和年级
query = query.join(Student, WeeklyAttendance.student_number == Student.student_number)
if supervisors:
# 导师多选筛选
query = query.filter(Student.supervisor.in_(supervisors))
if grades:
# 年级多选筛选,转换为整数
grade_ints = []
for grade in grades:
try:
grade_ints.append(int(grade))
except ValueError:
continue
if grade_ints:
query = query.filter(Student.grade.in_(grade_ints))
# 🔥 修改先获取基础记录稍后在Python中计算迟到次数和缺勤次数
base_records = query.all()
# 🔥 性能优化:批量查询所有每日明细数据
print("🚀 批量查询每日明细数据...")
record_ids = [r.record_id for r in base_records]
if record_ids:
# 一次性查询所有相关的每日明细
all_daily_details = DailyAttendanceDetail.query.filter(
DailyAttendanceDetail.weekly_record_id.in_(record_ids)
).all()
# 按record_id分组
daily_details_by_record = {}
for detail in all_daily_details:
record_id = detail.weekly_record_id
if record_id not in daily_details_by_record:
daily_details_by_record[record_id] = []
daily_details_by_record[record_id].append(detail)
print(f"批量查询完成,共获取 {len(all_daily_details)} 条每日明细")
else:
daily_details_by_record = {}
# 🔥 优化:批量计算迟到次数和缺勤次数
def calculate_counts_batch(daily_details_dict: dict) -> dict:
"""批量计算所有记录的迟到次数和缺勤次数 - 修复版本"""
counts = {}
for record_id, daily_details in daily_details_dict.items():
late_count = 0
absent_count = 0
for detail in daily_details:
# 🔥 新增:处理完全缺勤的情况
if detail.status == '缺勤' and (not detail.remarks or not detail.remarks.startswith('{')):
# 完全缺勤的天数,早上+下午都缺勤
absent_count += 2 # 早上缺勤1次 + 下午缺勤1次
print(f" 完全缺勤日期 {detail.attendance_date}: +2次")
continue
if detail.remarks and detail.remarks.startswith('{'):
try:
remarks_data = json.loads(detail.remarks)
details = remarks_data.get('details', {})
# 统计迟到次数
for period in ['morning', 'afternoon', 'evening']:
period_data = details.get(period, {})
if period_data.get('status') == 'late':
late_count += 1
# 统计缺勤次数
# 检查早上缺勤morning_in AND morning_out 都missing
morning_data = details.get('morning', {})
morning_in_time = morning_data.get('in')
morning_out_time = morning_data.get('out')
morning_in_status = morning_data.get('status', 'missing')
if ((not morning_in_time or morning_in_status == 'missing') and
(not morning_out_time)):
absent_count += 1
print(f" 部分缺勤日期 {detail.attendance_date}: 早上缺勤 +1次")
# 检查下午缺勤afternoon_in AND afternoon_out 都missing
afternoon_data = details.get('afternoon', {})
afternoon_in_time = afternoon_data.get('in')
afternoon_out_time = afternoon_data.get('out')
afternoon_in_status = afternoon_data.get('status', 'missing')
if ((not afternoon_in_time or afternoon_in_status == 'missing') and
(not afternoon_out_time)):
absent_count += 1
print(f" 部分缺勤日期 {detail.attendance_date}: 下午缺勤 +1次")
except (json.JSONDecodeError, KeyError, AttributeError):
continue
counts[record_id] = {
'late_count': late_count,
'absent_count': absent_count
}
print(f" 记录{record_id}最终缺勤次数: {absent_count}")
return counts
# 🔥 执行批量计算
print("🔍 开始批量计算迟到次数和缺勤次数...")
all_counts = calculate_counts_batch(daily_details_by_record)
# 🔥 为每个记录添加计算属性
records_with_counts = []
for record in base_records:
counts = all_counts.get(record.record_id, {'late_count': 0, 'absent_count': 0})
record.late_count = counts['late_count']
record.absent_count = counts['absent_count']
records_with_counts.append(record)
print(f"批量计算完成,处理了 {len(records_with_counts)} 条记录")
# 🔥 修改处理排序现在基于Python计算的迟到次数和缺勤次数
if sort_by and '_' in sort_by:
field, direction = sort_by.rsplit('_', 1)
print(f"排序字段: {field}, 方向: {direction}") # 调试信息
if direction not in ['asc', 'desc']:
direction = 'desc'
# 根据字段设置排序
if field == 'late_count':
# 🔥 按真实迟到次数排序
records_with_counts.sort(
key=lambda x: x.late_count,
reverse=(direction == 'desc')
)
elif field == 'absent_count' or field == 'absent_days': # 兼容旧的absent_days参数
# 🔥 新增:按真实缺勤次数排序
records_with_counts.sort(
key=lambda x: x.absent_count,
reverse=(direction == 'desc')
)
elif field == 'actual_work_hours':
records_with_counts.sort(
key=lambda x: x.actual_work_hours,
reverse=(direction == 'desc')
)
elif field == 'class_work_hours':
records_with_counts.sort(
key=lambda x: x.class_work_hours,
reverse=(direction == 'desc')
)
elif field == 'overtime_hours':
records_with_counts.sort(
key=lambda x: x.overtime_hours,
reverse=(direction == 'desc')
)
elif field == 'created_at':
records_with_counts.sort(
key=lambda x: x.created_at,
reverse=(direction == 'desc')
)
elif field == 'week_start_date':
records_with_counts.sort(
key=lambda x: x.week_start_date,
reverse=(direction == 'desc')
)
else:
# 未知字段,使用默认排序
records_with_counts.sort(
key=lambda x: x.week_start_date,
reverse=True
)
else:
# 默认排序:按周开始日期降序
records_with_counts.sort(
key=lambda x: x.week_start_date,
reverse=True
)
# 🔥 修改:手动实现分页
total_count = len(records_with_counts)
start_index = (page - 1) * per_page
end_index = start_index + per_page
attendance_records = records_with_counts[start_index:end_index]
# 🔥 完善分页对象,添加 iter_pages 方法
class SimplePagination:
def __init__(self, page, per_page, total, items):
self.page = page
self.per_page = per_page
self.total = total
self.items = items
self.pages = (total + per_page - 1) // per_page # 总页数
self.prev_num = page - 1 if page > 1 else None
self.next_num = page + 1 if page < self.pages else None
self.has_prev = page > 1
self.has_next = page < self.pages
def iter_pages(self, left_edge=2, left_current=2, right_current=3, right_edge=2):
"""生成分页页码列表与Flask-SQLAlchemy的Pagination兼容"""
last = self.pages
for num in range(1, last + 1):
if (num <= left_edge or
(num > self.page - left_current - 1 and num < self.page + right_current) or
num > last - right_edge):
yield num
pagination = SimplePagination(
page=page,
per_page=per_page,
total=total_count,
items=attendance_records
)
print(f"查询结果: {len(attendance_records)} 条记录") # 调试信息
if attendance_records:
print(f"第一条记录迟到次数: {attendance_records[0].late_count}") # 调试信息
print(f"第一条记录缺勤次数: {attendance_records[0].absent_count}") # 调试信息
# ========== 计算请假统计 ==========
statistics = None
if attendance_records:
# 获取当前筛选结果中的所有考勤记录ID
record_ids = [record.record_id for record in attendance_records]
# 统计请假天数
leave_count = DailyAttendanceDetail.query.filter(
DailyAttendanceDetail.weekly_record_id.in_(record_ids),
DailyAttendanceDetail.status == '请假'
).count()
statistics = {
'total_leave_days': leave_count
}
# 🔥 新增:获取所有导师和年级用于高级搜索选项
all_supervisors = db.session.query(Student.supervisor).distinct().filter(
Student.supervisor.isnot(None), Student.supervisor != ''
).order_by(Student.supervisor).all()
all_supervisors = [s[0] for s in all_supervisors]
all_grades = db.session.query(Student.grade).distinct().filter(
Student.grade.isnot(None)
).order_by(Student.grade).all()
all_grades = [g[0] for g in all_grades]
# 确保总是返回模板
return render_template('admin/attendance_management.html',
attendance_records=attendance_records,
pagination=pagination,
start_date=start_date,
end_date=end_date,
student_search=student_search,
sort_by=sort_by,
statistics=statistics,
all_supervisors=all_supervisors,
all_grades=all_grades,
selected_supervisors=supervisors,
selected_grades=grades)
@admin_bp.route('/upload/attendance', methods=['GET', 'POST'])
@admin_required
def upload_attendance():
"""上传考勤数据"""
if request.method == 'POST':
# 检查考勤记录文件
if 'attendance_file' not in request.files:
flash('请选择考勤记录文件', 'error')
return render_template('admin/upload_attendance.html')
attendance_file = request.files['attendance_file']
if attendance_file.filename == '':
flash('请选择考勤记录文件', 'error')
return render_template('admin/upload_attendance.html')
# 检查请假单文件(可选)
leave_file = request.files.get('leave_file')
has_leave_file = leave_file and leave_file.filename != ''
week_start = request.form.get('week_start')
week_end = request.form.get('week_end')
if not week_start or not week_end:
flash('请选择周开始和结束日期', 'error')
return render_template('admin/upload_attendance.html')
if attendance_file and attendance_file.filename.endswith(('.xlsx', '.xls')):
attendance_filename = secure_filename(attendance_file.filename)
leave_filename = secure_filename(leave_file.filename) if has_leave_file else None
# 使用临时文件
attendance_temp_file = None
leave_temp_file = None
try:
# 保存考勤记录文件
attendance_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx')
attendance_file.save(attendance_temp_file.name)
attendance_temp_file.close()
# 保存请假单文件(如果有)
if has_leave_file and leave_file.filename.endswith(('.xlsx', '.xls')):
leave_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx')
leave_file.save(leave_temp_file.name)
leave_temp_file.close()
# 处理数据
importer = AttendanceDataImporter()
# 解析考勤数据
attendance_data = importer.parse_xlsx_file(attendance_temp_file.name)
# 解析请假数据(如果有)
leave_data = None
if leave_temp_file:
try:
leave_data = importer.parse_leave_file(leave_temp_file.name)
flash(f'成功解析请假记录 {len(leave_data)}', 'info')
except Exception as e:
flash(f'请假单解析失败:{str(e)}', 'warning')
leave_data = None
# 应用请假数据到考勤数据
if leave_data:
attendance_data = importer.apply_leave_records(attendance_data, leave_data, week_start, week_end)
# 导入到数据库
success_count, error_count, error_messages = importer.import_to_database(
attendance_data, week_start, week_end)
# 如果有请假数据,同时保存到请假记录表
if leave_data:
leave_success_count = importer.import_leave_records_to_database(leave_data)
flash(f'请假记录导入:{leave_success_count}', 'info')
if success_count > 0:
message = f'导入完成:成功 {success_count} 条,失败 {error_count}'
if has_leave_file:
message += f',已处理请假记录'
flash(message, 'success')
if error_messages:
for msg in error_messages[:5]: # 只显示前5个错误
flash(msg, 'warning')
else:
flash('导入失败,请检查文件格式和数据', 'error')
for msg in error_messages[:3]:
flash(msg, 'error')
except Exception as e:
flash(f'文件处理失败:{str(e)}', 'error')
print(f"文件处理失败: {str(e)}")
finally:
# 删除临时文件
try:
if attendance_temp_file:
os.unlink(attendance_temp_file.name)
if leave_temp_file:
os.unlink(leave_temp_file.name)
except:
pass
return redirect(url_for('admin.attendance_management'))
else:
flash('请上传Excel文件(.xlsx或.xls)', 'error')
return render_template('admin/upload_attendance.html')
# 添加一个新的路由来删除考勤记录
@admin_bp.route('/attendance/<int:record_id>/delete', methods=['POST'])
@admin_required
def delete_attendance_record(record_id):
"""删除考勤记录"""
try:
record = WeeklyAttendance.query.get_or_404(record_id)
db.session.delete(record)
db.session.commit()
flash('考勤记录删除成功', 'success')
except Exception as e:
db.session.rollback()
flash(f'删除失败: {str(e)}', 'error')
return redirect(url_for('admin.attendance_management'))
@admin_bp.route('/statistics')
@admin_required
def statistics():
"""统计报表"""
from sqlalchemy import desc, func, case, or_, and_
from datetime import datetime, timedelta
from collections import OrderedDict
# 获取筛选参数
search = request.args.get('search', '').strip()
grade_filter = request.args.get('grade', '').strip()
college_filter = request.args.get('college', '').strip()
supervisor_filter = request.args.get('supervisor', '').strip()
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
# 构建基础查询
base_query = db.session.query(
Student.student_number,
Student.name,
Student.grade,
Student.college,
Student.supervisor,
Student.degree_type,
Student.enrollment_date,
func.coalesce(func.sum(WeeklyAttendance.actual_work_hours), 0).label('total_work_hours'),
func.coalesce(func.sum(WeeklyAttendance.class_work_hours), 0).label('total_class_hours'),
func.coalesce(func.sum(WeeklyAttendance.overtime_hours), 0).label('total_overtime_hours'),
func.coalesce(func.sum(WeeklyAttendance.absent_days), 0).label('total_absent_days'),
func.count(WeeklyAttendance.record_id).label('attendance_weeks'),
# 计算迟到次数
func.coalesce(
func.sum(
case(
(DailyAttendanceDetail.status.like('%迟到%'), 1),
else_=0
)
), 0
).label('total_late_count')
).outerjoin(
WeeklyAttendance, Student.student_number == WeeklyAttendance.student_number
).outerjoin(
DailyAttendanceDetail, WeeklyAttendance.record_id == DailyAttendanceDetail.weekly_record_id
)
# 应用日期筛选
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
base_query = base_query.filter(WeeklyAttendance.week_start_date >= start_date_obj)
except ValueError:
flash('开始日期格式错误', 'error')
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
base_query = base_query.filter(WeeklyAttendance.week_end_date <= end_date_obj)
except ValueError:
flash('结束日期格式错误', 'error')
# 应用学生筛选
if search:
base_query = base_query.filter(or_(
Student.name.contains(search),
Student.student_number.contains(search)
))
if grade_filter:
try:
grade_int = int(grade_filter)
base_query = base_query.filter(Student.grade == grade_int)
except ValueError:
pass
if college_filter:
base_query = base_query.filter(Student.college == college_filter)
if supervisor_filter:
base_query = base_query.filter(Student.supervisor == supervisor_filter)
# 按学生分组
base_query = base_query.group_by(Student.student_id)
# 获取学生统计数据
students_stats = base_query.all()
# 年级映射函数和排序权重
def get_grade_info(grade, degree_type):
"""返回年级标签和排序权重"""
if degree_type in ['学博', '专博']:
# 博士权重为0-99年级越高权重越小优先显示
label = f'博士{grade}年级'
sort_weight = 10 - grade # 博士4年级权重6博士3年级权重7以此类推
else:
# 硕士权重为100-199年级越高权重越小
if grade == 1:
label = '研一'
elif grade == 2:
label = '研二'
elif grade == 3:
label = '研三'
else:
label = f'{grade}'
sort_weight = 110 - grade # 研三权重107研二权重108研一权重109
return label, sort_weight
# 处理学生数据并收集到临时字典
temp_grade_groups = {}
all_students_data = []
for stat in students_stats:
grade_label, sort_weight = get_grade_info(stat.grade, stat.degree_type)
student_data = {
'student_number': stat.student_number,
'name': stat.name,
'grade': stat.grade,
'grade_label': grade_label,
'college': stat.college,
'supervisor': stat.supervisor,
'degree_type': stat.degree_type,
'enrollment_date': stat.enrollment_date,
'total_work_hours': float(stat.total_work_hours),
'total_class_hours': float(stat.total_class_hours),
'total_overtime_hours': float(stat.total_overtime_hours),
'total_absent_days': int(stat.total_absent_days),
'total_late_count': int(stat.total_late_count),
'attendance_weeks': int(stat.attendance_weeks),
'avg_weekly_hours': round(float(stat.total_work_hours) / max(stat.attendance_weeks, 1),
1) if stat.attendance_weeks > 0 else 0
}
all_students_data.append(student_data)
if grade_label not in temp_grade_groups:
temp_grade_groups[grade_label] = {
'students': [],
'sort_weight': sort_weight
}
temp_grade_groups[grade_label]['students'].append(student_data)
# 按出勤时长排序每个年级的学生
for grade in temp_grade_groups:
temp_grade_groups[grade]['students'].sort(key=lambda x: x['total_work_hours'], reverse=True)
# 🔥 按排序权重重新排序年级组,生成有序字典
sorted_grade_items = sorted(temp_grade_groups.items(), key=lambda x: x[1]['sort_weight'])
# 创建最终的有序年级组字典
grade_groups = OrderedDict()
for grade_label, grade_data in sorted_grade_items:
grade_groups[grade_label] = grade_data['students']
# 总体统计
overall_stats = {
'total_students': len(all_students_data),
'total_work_hours': sum(s['total_work_hours'] for s in all_students_data),
'total_absent_days': sum(s['total_absent_days'] for s in all_students_data),
'total_late_count': sum(s['total_late_count'] for s in all_students_data),
'avg_work_hours_per_student': round(
sum(s['total_work_hours'] for s in all_students_data) / max(len(all_students_data), 1), 1)
}
# 🔥 修正月度统计查询
monthly_query = db.session.query(
func.date_format(WeeklyAttendance.week_start_date, '%Y-%m').label('month'),
func.count(WeeklyAttendance.record_id).label('record_count'),
func.sum(WeeklyAttendance.actual_work_hours).label('total_hours'),
func.sum(WeeklyAttendance.absent_days).label('total_absent')
).group_by('month').order_by('month')
# 应用相同的筛选条件到月度统计
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
monthly_query = monthly_query.filter(WeeklyAttendance.week_start_date >= start_date_obj)
except ValueError:
pass
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
monthly_query = monthly_query.filter(WeeklyAttendance.week_end_date <= end_date_obj)
except ValueError:
pass
# 如果有学生筛选,需要关联学生表
if search or grade_filter or college_filter or supervisor_filter:
monthly_query = monthly_query.join(
Student, WeeklyAttendance.student_number == Student.student_number
)
if search:
monthly_query = monthly_query.filter(or_(
Student.name.contains(search),
Student.student_number.contains(search)
))
if grade_filter:
try:
grade_int = int(grade_filter)
monthly_query = monthly_query.filter(Student.grade == grade_int)
except ValueError:
pass
if college_filter:
monthly_query = monthly_query.filter(Student.college == college_filter)
if supervisor_filter:
monthly_query = monthly_query.filter(Student.supervisor == supervisor_filter)
monthly_stats = monthly_query.all()
# 🔥 修正按学院统计查询
college_query = db.session.query(
Student.college,
func.count(Student.student_id).label('student_count'),
func.coalesce(func.sum(WeeklyAttendance.actual_work_hours), 0).label('total_hours')
).outerjoin(WeeklyAttendance, Student.student_number == WeeklyAttendance.student_number)
# 应用筛选条件到学院统计
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
college_query = college_query.filter(
or_(WeeklyAttendance.week_start_date.is_(None),
WeeklyAttendance.week_start_date >= start_date_obj)
)
except ValueError:
pass
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
college_query = college_query.filter(
or_(WeeklyAttendance.week_end_date.is_(None),
WeeklyAttendance.week_end_date <= end_date_obj)
)
except ValueError:
pass
college_stats = college_query.group_by(Student.college).all()
# 获取筛选选项
colleges = db.session.query(Student.college).distinct().all()
colleges = [c[0] for c in colleges if c[0]]
supervisors = db.session.query(Student.supervisor).distinct().all()
supervisors = [s[0] for s in supervisors if s[0]]
grades = db.session.query(Student.grade).distinct().all()
grades = sorted([g[0] for g in grades if g[0]])
print("=== 调试信息 ===")
print(f"月度统计数据: {monthly_stats}")
print(f"学院统计数据: {college_stats}")
print(f"年级组排序: {list(grade_groups.keys())}")
print("===============")
return render_template('admin/statistics.html',
grade_groups=grade_groups,
all_students_data=all_students_data,
overall_stats=overall_stats,
monthly_stats=monthly_stats,
college_stats=college_stats,
colleges=colleges,
supervisors=supervisors,
grades=grades,
search=search,
selected_grade=grade_filter,
selected_college=college_filter,
selected_supervisor=supervisor_filter,
start_date=start_date,
end_date=end_date)
@admin_bp.route('/statistics/export')
@admin_required
def export_statistics():
"""导出统计数据"""
import pandas as pd
from io import BytesIO
# 获取所有学生统计数据(复用上面的查询逻辑)
students_query = db.session.query(
Student.student_number,
Student.name,
Student.grade,
Student.college,
Student.supervisor,
Student.degree_type,
func.coalesce(func.sum(WeeklyAttendance.actual_work_hours), 0).label('total_work_hours'),
func.coalesce(func.sum(WeeklyAttendance.class_work_hours), 0).label('total_class_hours'),
func.coalesce(func.sum(WeeklyAttendance.overtime_hours), 0).label('total_overtime_hours'),
func.coalesce(func.sum(WeeklyAttendance.absent_days), 0).label('total_absent_days'),
func.count(WeeklyAttendance.record_id).label('attendance_weeks')
).outerjoin(
WeeklyAttendance, Student.student_number == WeeklyAttendance.student_number
).group_by(Student.student_id).all()
# 转换为DataFrame
data = []
for stat in students_query:
grade_label = f'博士{stat.grade}年级' if stat.degree_type in ['学博', '专博'] else f'{stat.grade}'
data.append({
'学号': stat.student_number,
'姓名': stat.name,
'年级': grade_label,
'学院': stat.college,
'导师': stat.supervisor,
'学位类型': stat.degree_type,
'总出勤时长(小时)': float(stat.total_work_hours),
'班内工作时长(小时)': float(stat.total_class_hours),
'加班时长(小时)': float(stat.total_overtime_hours),
'缺勤天数': int(stat.total_absent_days),
'考勤周数': int(stat.attendance_weeks),
'周均工作时长': round(float(stat.total_work_hours) / max(stat.attendance_weeks, 1), 1)
})
df = pd.DataFrame(data)
# 创建Excel文件
output = BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='学生考勤统计', index=False)
output.seek(0)
filename = f"学生考勤统计_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
@admin_bp.route('/students/add', methods=['GET', 'POST'])
@admin_required
def add_student():
"""添加学生"""
if request.method == 'POST':
try:
data = request.get_json() if request.is_json else request.form
# 检查学号是否已存在
if Student.query.filter_by(student_number=data['student_number']).first():
if request.is_json:
return jsonify({'success': False, 'message': '学号已存在'})
else:
flash('学号已存在', 'error')
return render_template('admin/add_student.html')
# 创建用户账户
user = User(
student_number=data['student_number'],
password_hash=generate_password_hash(data.get('password', '123456')),
role='student'
)
success, error = safe_add_and_commit(user)
if not success:
if request.is_json:
return jsonify({'success': False, 'message': f'创建用户失败: {error}'})
else:
flash(f'创建用户失败: {error}', 'error')
return render_template('admin/add_student.html')
# 创建学生记录
student = Student(
student_number=data['student_number'],
name=data['name'],
gender=data['gender'],
grade=int(data['grade']),
phone=data.get('phone', ''),
supervisor=data.get('supervisor', ''),
college=data.get('college', ''),
major=data.get('major', ''),
degree_type=data.get('degree_type') if data.get('degree_type') else None,
status=data.get('status', '在读'),
enrollment_date=datetime.strptime(data['enrollment_date'], '%Y-%m-%d').date() if data.get(
'enrollment_date') else None
)
success, error = safe_add_and_commit(student)
if success:
if request.is_json:
return jsonify({'success': True, 'message': '学生添加成功'})
else:
flash('学生添加成功', 'success')
return redirect(url_for('admin.student_list'))
else:
if request.is_json:
return jsonify({'success': False, 'message': f'添加失败: {error}'})
else:
flash(f'添加失败: {error}', 'error')
except Exception as e:
if request.is_json:
return jsonify({'success': False, 'message': f'添加失败: {str(e)}'})
else:
flash(f'添加失败: {str(e)}', 'error')
return render_template('admin/add_student.html')
@admin_bp.route('/students/<string:student_number>/edit', methods=['GET', 'POST'])
@admin_required
def edit_student(student_number):
"""编辑学生信息"""
student = Student.query.filter_by(student_number=student_number).first_or_404()
if request.method == 'POST':
try:
data = request.get_json() if request.is_json else request.form
# 更新学生信息
student.name = data['name']
student.gender = data['gender']
student.grade = int(data['grade'])
student.phone = data.get('phone', '')
student.supervisor = data.get('supervisor', '')
student.college = data.get('college', '')
student.major = data.get('major', '')
student.degree_type = data.get('degree_type') if data.get('degree_type') else None
student.status = data.get('status', '在读')
if data.get('enrollment_date'):
student.enrollment_date = datetime.strptime(data['enrollment_date'], '%Y-%m-%d').date()
success, error = safe_commit()
if success:
if request.is_json:
return jsonify({'success': True, 'message': '学生信息更新成功'})
else:
flash('学生信息更新成功', 'success')
return redirect(url_for('admin.student_detail', student_number=student_number))
else:
if request.is_json:
return jsonify({'success': False, 'message': f'更新失败: {error}'})
else:
flash(f'更新失败: {error}', 'error')
except Exception as e:
if request.is_json:
return jsonify({'success': False, 'message': f'更新失败: {str(e)}'})
else:
flash(f'更新失败: {str(e)}', 'error')
# GET请求返回学生数据用于编辑
if request.is_json:
return jsonify({
'success': True,
'student': {
'student_number': student.student_number,
'name': student.name,
'gender': student.gender,
'grade': student.grade,
'phone': student.phone,
'supervisor': student.supervisor,
'college': student.college,
'major': student.major,
'degree_type': student.degree_type,
'status': student.status,
'enrollment_date': student.enrollment_date.strftime('%Y-%m-%d') if student.enrollment_date else ''
}
})
return render_template('admin/edit_student.html', student=student)
@admin_bp.route('/students/<string:student_number>/delete', methods=['POST'])
@admin_required
def delete_student(student_number):
"""删除学生"""
try:
# 首先删除相关的考勤记录
WeeklyAttendance.query.filter_by(student_number=student_number).delete()
DailyAttendanceDetail.query.filter_by(student_number=student_number).delete()
LeaveRecord.query.filter_by(student_number=student_number).delete()
# 然后删除学生记录
student = Student.query.filter_by(student_number=student_number).first_or_404()
student_name = student.name
db.session.delete(student)
# 最后删除用户记录
user = User.query.filter_by(student_number=student_number).first()
if user:
db.session.delete(user)
# 提交事务
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'message': f'学生 {student_name} 删除成功'})
else:
flash(f'学生 {student_name} 删除成功', 'success')
return redirect(url_for('admin.student_list'))
except Exception as e:
db.session.rollback()
if request.is_json:
return jsonify({'success': False, 'message': f'删除失败: {str(e)}'})
else:
flash(f'删除失败: {str(e)}', 'error')
return redirect(url_for('admin.student_list'))
@admin_bp.route('/students/batch_action', methods=['POST'])
@admin_required
def batch_action():
"""批量操作学生"""
try:
data = request.get_json()
action = data.get('action')
student_numbers = data.get('student_numbers', [])
if not student_numbers:
return jsonify({'success': False, 'message': '请选择要操作的学生'})
if action == 'delete':
# 批量删除:先删除相关记录
WeeklyAttendance.query.filter(WeeklyAttendance.student_number.in_(student_numbers)).delete(synchronize_session=False)
DailyAttendanceDetail.query.filter(DailyAttendanceDetail.student_number.in_(student_numbers)).delete(synchronize_session=False)
LeaveRecord.query.filter(LeaveRecord.student_number.in_(student_numbers)).delete(synchronize_session=False)
Student.query.filter(Student.student_number.in_(student_numbers)).delete(synchronize_session=False)
User.query.filter(User.student_number.in_(student_numbers)).delete(synchronize_session=False)
db.session.commit()
return jsonify({'success': True, 'message': f'成功删除 {len(student_numbers)} 个学生'})
elif action == 'graduate':
# 批量设为毕业
Student.query.filter(Student.student_number.in_(student_numbers)).update(
{'status': '毕业'}, synchronize_session=False
)
db.session.commit()
return jsonify({'success': True, 'message': f'成功将 {len(student_numbers)} 个学生设为毕业状态'})
else:
return jsonify({'success': False, 'message': '无效的操作'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'操作失败: {str(e)}'})
@admin_bp.route('/students/<string:student_number>/reset_password', methods=['POST'])
@admin_required
def reset_student_password(student_number):
"""重置学生密码"""
try:
user = User.query.filter_by(student_number=student_number).first_or_404()
# 重置为默认密码
new_password = request.get_json().get('password', '123456') if request.is_json else '123456'
user.password_hash = generate_password_hash(new_password)
success, error = safe_commit()
if success:
if request.is_json:
return jsonify({'success': True, 'message': '密码重置成功'})
else:
flash('密码重置成功', 'success')
return redirect(url_for('admin.student_detail', student_number=student_number))
else:
if request.is_json:
return jsonify({'success': False, 'message': f'重置失败: {error}'})
else:
flash(f'重置失败: {error}', 'error')
except Exception as e:
if request.is_json:
return jsonify({'success': False, 'message': f'重置失败: {str(e)}'})
else:
flash(f'重置失败: {str(e)}', 'error')
return redirect(url_for('admin.student_detail', student_number=student_number))
@admin_bp.route('/students/<string:student_number>/toggle_status', methods=['POST'])
@admin_required
def toggle_student_status(student_number):
"""切换学生账户状态"""
try:
user = User.query.filter_by(student_number=student_number).first_or_404()
user.is_active = not user.is_active
success, error = safe_commit()
if success:
status_text = '启用' if user.is_active else '禁用'
if request.is_json:
return jsonify({'success': True, 'message': f'账户{status_text}成功'})
else:
flash(f'账户{status_text}成功', 'success')
else:
if request.is_json:
return jsonify({'success': False, 'message': f'操作失败: {error}'})
else:
flash(f'操作失败: {error}', 'error')
except Exception as e:
if request.is_json:
return jsonify({'success': False, 'message': f'操作失败: {str(e)}'})
else:
flash(f'操作失败: {str(e)}', 'error')
return redirect(url_for('admin.student_detail', student_number=student_number))
@admin_bp.route('/attendance/<int:record_id>/details')
@admin_required
def attendance_record_details(record_id):
"""查看考勤记录详情"""
from datetime import datetime, timedelta
import json
# 获取周考勤汇总记录
weekly_record = WeeklyAttendance.query.get_or_404(record_id)
# 获取学生信息
student = Student.query.filter_by(student_number=weekly_record.student_number).first()
# 获取该周的每日考勤明细
daily_details = DailyAttendanceDetail.query.filter_by(
weekly_record_id=record_id
).order_by(DailyAttendanceDetail.attendance_date).all()
# 🔥 新增:计算真实的缺勤次数和迟到次数
def calculate_real_counts(daily_details_list):
"""计算真实的缺勤次数和迟到次数"""
late_count = 0
absent_count = 0
for detail in daily_details_list:
# 🔥 处理完全缺勤的情况
if detail.status == '缺勤' and (not detail.remarks or not detail.remarks.startswith('{')):
# 完全缺勤的天数,早上+下午都缺勤
absent_count += 2 # 早上缺勤1次 + 下午缺勤1次
print(f" 完全缺勤日期 {detail.attendance_date}: +2次")
continue
if detail.remarks and detail.remarks.startswith('{'):
try:
remarks_data = json.loads(detail.remarks)
details_info = remarks_data.get('details', {})
# 统计迟到次数
for period in ['morning', 'afternoon', 'evening']:
period_data = details_info.get(period, {})
if period_data.get('status') == 'late':
late_count += 1
# 统计缺勤次数
# 检查早上缺勤morning_in AND morning_out 都missing
morning_data = details_info.get('morning', {})
morning_in_time = morning_data.get('in')
morning_out_time = morning_data.get('out')
morning_in_status = morning_data.get('status', 'missing')
if ((not morning_in_time or morning_in_status == 'missing') and
(not morning_out_time)):
absent_count += 1
print(f" 部分缺勤日期 {detail.attendance_date}: 早上缺勤 +1次")
# 检查下午缺勤afternoon_in AND afternoon_out 都missing
afternoon_data = details_info.get('afternoon', {})
afternoon_in_time = afternoon_data.get('in')
afternoon_out_time = afternoon_data.get('out')
afternoon_in_status = afternoon_data.get('status', 'missing')
if ((not afternoon_in_time or afternoon_in_status == 'missing') and
(not afternoon_out_time)):
absent_count += 1
print(f" 部分缺勤日期 {detail.attendance_date}: 下午缺勤 +1次")
except (json.JSONDecodeError, KeyError, AttributeError):
continue
print(f" 详情页面计算结果 - 迟到次数: {late_count}, 缺勤次数: {absent_count}")
return late_count, absent_count
# 🔥 计算真实的次数
real_late_count, real_absent_count = calculate_real_counts(daily_details)
# 🔥 将计算结果添加到weekly_record对象中
weekly_record.late_count = real_late_count
weekly_record.absent_count = real_absent_count
# 处理每日详情,计算工作时长和解析详细信息
processed_daily_details = []
for detail in daily_details:
processed_detail = {
'detail_id': detail.detail_id,
'attendance_date': detail.attendance_date,
'status': detail.status,
'check_in_time': detail.check_in_time,
'check_out_time': detail.check_out_time,
'remarks': detail.remarks,
'duration_hours': None,
'detailed_info': None
}
# 计算工作时长
if detail.check_in_time and detail.check_out_time:
try:
# 创建完整的datetime对象
start_datetime = datetime.combine(detail.attendance_date, detail.check_in_time)
end_datetime = datetime.combine(detail.attendance_date, detail.check_out_time)
# 如果结束时间小于开始时间,说明跨天了
if end_datetime < start_datetime:
end_datetime += timedelta(days=1)
duration = (end_datetime - start_datetime).total_seconds() / 3600
processed_detail['duration_hours'] = round(duration, 1)
except Exception as e:
print(f"计算工作时长失败: {e}")
processed_detail['duration_hours'] = None
# 解析详细信息
if detail.remarks:
try:
if detail.remarks.startswith('{'):
remarks_data = json.loads(detail.remarks)
processed_detail['detailed_info'] = remarks_data.get('details')
processed_detail['summary_remarks'] = remarks_data.get('summary', detail.remarks)
else:
processed_detail['summary_remarks'] = detail.remarks
except:
processed_detail['summary_remarks'] = detail.remarks
processed_daily_details.append(processed_detail)
# 计算统计数据
total_days = len(processed_daily_details)
present_days = len([d for d in processed_daily_details if d['status'] == '正常'])
late_days = len([d for d in processed_daily_details if '迟到' in d['status']])
absent_days = len([d for d in processed_daily_details if d['status'] == '缺勤'])
# 计算平均每日工作时长
if processed_daily_details:
avg_daily_hours = weekly_record.actual_work_hours / max(present_days, 1)
else:
avg_daily_hours = 0
# 获取该学生的历史考勤记录(用于对比)
historical_records = WeeklyAttendance.query.filter_by(
student_number=weekly_record.student_number
).filter(WeeklyAttendance.record_id != record_id).order_by(
desc(WeeklyAttendance.week_start_date)
).limit(5).all()
return render_template('admin/attendance_details.html',
weekly_record=weekly_record,
student=student,
daily_details=processed_daily_details,
total_days=total_days,
present_days=present_days,
late_days=late_days,
absent_days=absent_days,
avg_daily_hours=avg_daily_hours,
historical_records=historical_records)
@admin_bp.route('/attendance/<int:record_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_attendance_record(record_id):
"""编辑考勤记录"""
weekly_record = WeeklyAttendance.query.get_or_404(record_id)
if request.method == 'POST':
try:
data = request.get_json() if request.is_json else request.form
# 更新周考勤记录
weekly_record.actual_work_hours = float(data.get('actual_work_hours', 0))
weekly_record.class_work_hours = float(data.get('class_work_hours', 0))
weekly_record.absent_days = int(data.get('absent_days', 0))
weekly_record.overtime_hours = float(data.get('overtime_hours', 0))
success, error = safe_commit()
if success:
flash('考勤记录更新成功', 'success')
return redirect(url_for('admin.attendance_record_details', record_id=record_id))
else:
flash(f'更新失败: {error}', 'error')
except Exception as e:
flash(f'更新失败: {str(e)}', 'error')
return render_template('admin/edit_attendance_record.html', weekly_record=weekly_record)
def export_attendance_data():
"""导出考勤数据到Excel"""
from sqlalchemy import desc, func, case, or_
from io import BytesIO
try:
# 获取筛选参数
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
student_search = request.args.get('student_search', '').strip()
sort_by = request.args.get('sort_by', 'week_start_date_desc')
# 构建查询
query = db.session.query(
WeeklyAttendance,
func.coalesce(
func.sum(
case(
(DailyAttendanceDetail.status.like('%迟到%'), 1),
else_=0
)
), 0
).label('late_count')
).outerjoin(
DailyAttendanceDetail,
WeeklyAttendance.record_id == DailyAttendanceDetail.weekly_record_id
).group_by(WeeklyAttendance.record_id)
# 应用筛选条件
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
query = query.filter(WeeklyAttendance.week_start_date >= start_date_obj)
except ValueError:
pass
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
query = query.filter(WeeklyAttendance.week_end_date <= end_date_obj)
except ValueError:
pass
if student_search:
query = query.filter(or_(
WeeklyAttendance.name.contains(student_search),
WeeklyAttendance.student_number.contains(student_search)
))
# 应用排序
if sort_by and '_' in sort_by:
field, direction = sort_by.rsplit('_', 1)
if direction not in ['asc', 'desc']:
direction = 'desc'
if field == 'actual_work_hours':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.actual_work_hours))
else:
query = query.order_by(WeeklyAttendance.actual_work_hours)
elif field == 'week_start_date':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.week_start_date))
else:
query = query.order_by(WeeklyAttendance.week_start_date)
else:
query = query.order_by(desc(WeeklyAttendance.week_start_date))
else:
query = query.order_by(desc(WeeklyAttendance.week_start_date))
# 获取所有记录
results = query.all()
if not results:
flash('没有数据可导出', 'warning')
args = request.args.copy()
args.pop('export', None)
return redirect(url_for('admin.attendance_management', **args))
# 准备数据
data = []
for record, late_count in results:
data.append({
'学号': record.student_number,
'姓名': record.name,
'周开始日期': record.week_start_date.strftime('%Y-%m-%d'),
'周结束日期': record.week_end_date.strftime('%Y-%m-%d'),
'实际出勤时长(小时)': float(record.actual_work_hours),
'班内工作时长(小时)': float(record.class_work_hours),
'旷工天数': int(record.absent_days),
'迟到次数': int(late_count) if late_count else 0,
'加班时长(小时)': float(record.overtime_hours),
'记录创建时间': record.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
# 创建DataFrame
df = pd.DataFrame(data)
# 创建Excel文件
output = BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='考勤记录', index=False)
# 调整列宽
workbook = writer.book
worksheet = writer.sheets['考勤记录']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 30)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
# 生成文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"考勤记录_{timestamp}.xlsx"
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
flash(f'导出失败: {str(e)}', 'error')
# 移除export参数重定向到正常页面
args = request.args.copy()
args.pop('export', None)
return redirect(url_for('admin.attendance_management', **args))
@admin_bp.route('/students/batch_import', methods=['POST'])
@admin_required
def batch_import_students():
"""批量导入学生"""
try:
if 'excel_file' not in request.files:
return jsonify({'success': False, 'message': '请上传Excel文件'})
file = request.files['excel_file']
if file.filename == '':
return jsonify({'success': False, 'message': '请选择文件'})
if not file.filename.lower().endswith(('.xlsx', '.xls')):
return jsonify({'success': False, 'message': '请上传Excel文件.xlsx或.xls'})
overwrite = request.form.get('overwrite_existing') == 'on'
# 使用临时文件处理上传的Excel
import tempfile
import os
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx')
file.save(temp_file.name)
temp_file.close()
try:
# 解析Excel文件
import pandas as pd
df = pd.read_excel(temp_file.name)
print("Excel文件列名", df.columns.tolist())
print("数据行数:", len(df))
# 验证必要的列
required_columns = ['姓名', '性别', '年级', '学号']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return jsonify({
'success': False,
'message': f'Excel文件缺少必要的列: {", ".join(missing_columns)}'
})
success_count = 0
error_count = 0
skip_count = 0
details = []
# 遍历每行数据
for index, row in df.iterrows():
try:
# 提取数据
name = str(row['姓名']).strip() if pd.notna(row['姓名']) else ''
gender = str(row['性别']).strip() if pd.notna(row['性别']) else ''
grade = row['年级'] if pd.notna(row['年级']) else None
student_number = str(row['学号']).strip() if pd.notna(row['学号']) else ''
phone = str(row['手机号']).strip() if pd.notna(row.get('手机号', '')) and str(row.get('手机号', '')) != 'nan' else ''
supervisor = str(row['导师']).strip() if pd.notna(row.get('导师', '')) else ''
college = str(row['学院']).strip() if pd.notna(row.get('学院', '')) else ''
major = str(row['专业']).strip() if pd.notna(row.get('专业', '')) else ''
degree_type = str(row['学位类型']).strip() if pd.notna(row.get('学位类型', '')) else ''
# 验证必填字段
if not all([name, gender, student_number]) or grade is None:
details.append({
'name': name or '未知',
'student_number': student_number or '未知',
'status': 'error',
'message': '缺少必填字段'
})
error_count += 1
continue
# 验证年级格式
try:
grade = int(grade)
if grade < 20 or grade > 30:
raise ValueError("年级应在20-30之间")
except (ValueError, TypeError):
details.append({
'name': name,
'student_number': student_number,
'status': 'error',
'message': f'年级格式错误: {grade}'
})
error_count += 1
continue
# 验证性别
if gender not in ['', '']:
details.append({
'name': name,
'student_number': student_number,
'status': 'error',
'message': f'性别格式错误: {gender}'
})
error_count += 1
continue
# 检查学号是否已存在
existing_student = Student.query.filter_by(student_number=student_number).first()
if existing_student:
if not overwrite:
details.append({
'name': name,
'student_number': student_number,
'status': 'skip',
'message': '学号已存在,跳过'
})
skip_count += 1
continue
else:
# 更新现有学生信息
existing_student.name = name
existing_student.gender = gender
existing_student.grade = grade
existing_student.phone = phone
existing_student.supervisor = supervisor
existing_student.college = college
existing_student.major = major
existing_student.degree_type = degree_type if degree_type else None
details.append({
'name': name,
'student_number': student_number,
'status': 'success',
'message': '更新成功'
})
success_count += 1
continue
# 创建用户账户
user = User(
student_number=student_number,
password_hash=generate_password_hash('123456'),
role='student'
)
db.session.add(user)
db.session.flush() # 获取用户ID
# 创建学生记录
student = Student(
student_number=student_number,
name=name,
gender=gender,
grade=grade,
phone=phone,
supervisor=supervisor,
college=college,
major=major,
degree_type=degree_type if degree_type else None,
status='在读'
)
db.session.add(student)
details.append({
'name': name,
'student_number': student_number,
'status': 'success',
'message': '创建成功'
})
success_count += 1
except Exception as e:
details.append({
'name': name if 'name' in locals() else '未知',
'student_number': student_number if 'student_number' in locals() else '未知',
'status': 'error',
'message': f'处理失败: {str(e)}'
})
error_count += 1
continue
# 提交事务
db.session.commit()
return jsonify({
'success': True,
'success_count': success_count,
'error_count': error_count,
'skip_count': skip_count,
'details': details
})
finally:
# 删除临时文件
try:
os.unlink(temp_file.name)
except:
pass
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': f'导入失败: {str(e)}'})
@admin_bp.route('/students/download_template')
@admin_required
def download_student_template():
"""下载学生信息Excel模板"""
import pandas as pd
from io import BytesIO
# 创建模板数据
template_data = {
'姓名': ['张三', '李四', '王五'],
'性别': ['', '', ''],
'年级': [25, 25, 24],
'学号': ['23320251154001', '23320251154002', '23320241154003'],
'手机号': ['13800138001', '13800138002', '13800138003'],
'导师': ['张教授', '李教授', '王教授'],
'学院': ['信息学院', '信息学院', '计算机学院'],
'专业': ['人工智能', '通信工程', '计算机科学'],
'学位类型': ['专硕', '学硕', '专硕'],
'备注': ['', '', '']
}
df = pd.DataFrame(template_data)
# 创建Excel文件
output = BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='学生信息模板', index=False)
# 获取工作表并设置列宽
workbook = writer.book
worksheet = writer.sheets['学生信息模板']
# 设置列宽
column_widths = {
'A': 10, # 姓名
'B': 6, # 性别
'C': 8, # 年级
'D': 18, # 学号
'E': 15, # 手机号
'F': 12, # 导师
'G': 15, # 学院
'H': 15, # 专业
'I': 10, # 学位类型
'J': 10 # 备注
}
for col, width in column_widths.items():
worksheet.column_dimensions[col].width = width
# 设置表头格式
from openpyxl.styles import Font, PatternFill, Alignment
header_font = Font(bold=True, color='FFFFFF')
header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
header_alignment = Alignment(horizontal='center', vertical='center')
for cell in worksheet[1]:
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
output.seek(0)
filename = f"学生信息导入模板_{datetime.now().strftime('%Y%m%d')}.xlsx"
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)