superlishunqin 3e6c8d353c SmartDSP
2025-06-12 00:38:27 +08:00

1326 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, send_file
from flask_login import login_required, current_user
from app.models import db, User, Student, WeeklyAttendance, DailyAttendanceDetail, LeaveRecord
from app.utils.auth_helpers import admin_required
from app.utils.database import safe_add_and_commit, safe_commit, safe_delete_and_commit
from datetime import datetime, timedelta
from sqlalchemy import and_, or_, desc, func
import pandas as pd
import io
import re
from werkzeug.security import generate_password_hash
from app.utils.attendance_importer import AttendanceDataImporter
from werkzeug.utils import secure_filename
import os
import tempfile
admin_bp = Blueprint('admin', __name__)
@admin_bp.route('/dashboard')
@admin_required
def dashboard():
"""管理员主页"""
# 统计数据
total_students = Student.query.count()
total_attendance_records = WeeklyAttendance.query.count()
pending_leaves = LeaveRecord.query.filter_by(status='待审批').count()
# 最近一周的考勤统计
week_ago = datetime.now().date() - timedelta(days=7)
recent_records = WeeklyAttendance.query.filter(
WeeklyAttendance.week_start_date >= week_ago
).count()
# 按学院统计学生数量
college_stats = db.session.query(
Student.college,
func.count(Student.student_id).label('count')
).group_by(Student.college).all()
# 按导师统计学生数量
supervisor_stats = db.session.query(
Student.supervisor,
func.count(Student.student_id).label('count')
).group_by(Student.supervisor).order_by(desc('count')).limit(10).all()
# 最近的请假申请
recent_leaves = LeaveRecord.query.filter_by(
status='待审批'
).order_by(desc(LeaveRecord.created_at)).limit(5).all()
return render_template('admin/dashboard.html',
total_students=total_students,
total_attendance_records=total_attendance_records,
pending_leaves=pending_leaves,
recent_records=recent_records,
college_stats=college_stats,
supervisor_stats=supervisor_stats,
recent_leaves=recent_leaves)
@admin_bp.route('/students')
@admin_required
def student_list():
"""学生列表"""
page = request.args.get('page', 1, type=int)
per_page = 20
# 搜索和筛选
search = request.args.get('search', '').strip()
college = request.args.get('college', '').strip()
supervisor = request.args.get('supervisor', '').strip()
grade = request.args.get('grade', '', type=str)
query = Student.query
if search:
query = query.filter(or_(
Student.name.contains(search),
Student.student_number.contains(search)
))
if college:
query = query.filter(Student.college == college)
if supervisor:
query = query.filter(Student.supervisor == supervisor)
if grade:
try:
grade_int = int(grade)
query = query.filter(Student.grade == grade_int)
except ValueError:
pass
pagination = query.order_by(Student.student_number).paginate(
page=page, per_page=per_page, error_out=False
)
students = pagination.items
# 获取筛选选项
colleges = db.session.query(Student.college).distinct().all()
colleges = [c[0] for c in colleges if c[0]]
supervisors = db.session.query(Student.supervisor).distinct().all()
supervisors = [s[0] for s in supervisors if s[0]]
grades = db.session.query(Student.grade).distinct().all()
grades = sorted([g[0] for g in grades if g[0]])
return render_template('admin/student_list.html',
students=students,
pagination=pagination,
colleges=colleges,
supervisors=supervisors,
grades=grades,
search=search,
selected_college=college,
selected_supervisor=supervisor,
selected_grade=grade)
@admin_bp.route('/students/<student_number>')
@admin_required
def student_detail(student_number):
"""学生详细信息"""
student = Student.query.filter_by(student_number=student_number).first_or_404()
# 获取考勤记录
attendance_records = WeeklyAttendance.query.filter_by(
student_number=student_number
).order_by(desc(WeeklyAttendance.week_start_date)).limit(10).all()
# 获取请假记录
leave_records = LeaveRecord.query.filter_by(
student_number=student_number
).order_by(desc(LeaveRecord.created_at)).limit(10).all()
# 统计数据
total_work_hours = db.session.query(
func.sum(WeeklyAttendance.actual_work_hours)
).filter_by(student_number=student_number).scalar() or 0
total_absent_days = db.session.query(
func.sum(WeeklyAttendance.absent_days)
).filter_by(student_number=student_number).scalar() or 0
return render_template('admin/student_detail.html',
student=student,
attendance_records=attendance_records,
leave_records=leave_records,
total_work_hours=float(total_work_hours),
total_absent_days=int(total_absent_days))
@admin_bp.route('/attendance')
@admin_required
def attendance_management():
"""考勤管理"""
# ========== 导出功能处理 ==========
if request.args.get('export') == 'excel':
try:
return export_attendance_data()
except Exception as e:
flash(f'导出失败: {str(e)}', 'error')
# 移除export参数重定向到正常页面
args = dict(request.args)
args.pop('export', None)
return redirect(url_for('admin.attendance_management', **args))
# ==================================
from sqlalchemy import desc, func, case, or_
page = request.args.get('page', 1, type=int)
per_page = 50
# 筛选条件
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
student_search = request.args.get('student_search', '').strip()
sort_by = request.args.get('sort_by', 'week_start_date_desc') # 默认按周开始日期降序
print(f"收到排序参数: {sort_by}") # 调试信息
# 构建基础查询,同时计算迟到次数
query = db.session.query(
WeeklyAttendance,
func.coalesce(
func.sum(
case(
(DailyAttendanceDetail.status.like('%迟到%'), 1),
else_=0
)
), 0
).label('late_count')
).outerjoin(
DailyAttendanceDetail,
WeeklyAttendance.record_id == DailyAttendanceDetail.weekly_record_id
).group_by(WeeklyAttendance.record_id)
# 应用筛选条件
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
query = query.filter(WeeklyAttendance.week_start_date >= start_date_obj)
except ValueError:
flash('开始日期格式错误', 'error')
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
query = query.filter(WeeklyAttendance.week_end_date <= end_date_obj)
except ValueError:
flash('结束日期格式错误', 'error')
if student_search:
query = query.filter(or_(
WeeklyAttendance.name.contains(student_search),
WeeklyAttendance.student_number.contains(student_search)
))
# 处理排序
if sort_by and '_' in sort_by:
field, direction = sort_by.rsplit('_', 1)
print(f"排序字段: {field}, 方向: {direction}") # 调试信息
if direction not in ['asc', 'desc']:
direction = 'desc'
# 根据字段设置排序
if field == 'actual_work_hours':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.actual_work_hours))
else:
query = query.order_by(WeeklyAttendance.actual_work_hours)
elif field == 'class_work_hours':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.class_work_hours))
else:
query = query.order_by(WeeklyAttendance.class_work_hours)
elif field == 'absent_days':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.absent_days))
else:
query = query.order_by(WeeklyAttendance.absent_days)
elif field == 'overtime_hours':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.overtime_hours))
else:
query = query.order_by(WeeklyAttendance.overtime_hours)
elif field == 'late_count':
# 按迟到次数排序
if direction == 'desc':
query = query.order_by(desc('late_count'))
else:
query = query.order_by('late_count')
elif field == 'created_at':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.created_at))
else:
query = query.order_by(WeeklyAttendance.created_at)
elif field == 'week_start_date':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.week_start_date))
else:
query = query.order_by(WeeklyAttendance.week_start_date)
else:
# 未知字段,使用默认排序
query = query.order_by(desc(WeeklyAttendance.week_start_date))
else:
# 默认排序:按周开始日期降序
query = query.order_by(desc(WeeklyAttendance.week_start_date))
# 执行分页查询
try:
pagination = query.paginate(
page=page,
per_page=per_page,
error_out=False
)
except Exception as e:
print(f"查询分页失败: {e}")
flash('查询数据时出现错误', 'error')
return redirect(url_for('admin.dashboard'))
# 处理结果,将迟到次数添加到记录对象中
attendance_records = []
for record, late_count in pagination.items:
# 给记录对象添加迟到次数属性
record.late_count = int(late_count) if late_count else 0
attendance_records.append(record)
print(f"查询结果: {len(attendance_records)} 条记录") # 调试信息
if attendance_records:
print(f"第一条记录迟到次数: {attendance_records[0].late_count}") # 调试信息
# 更新pagination对象的items
pagination.items = attendance_records
# ========== 计算请假统计 ==========
statistics = None
if attendance_records:
# 获取当前筛选结果中的所有考勤记录ID
record_ids = [record.record_id for record in attendance_records]
# 统计请假天数
leave_count = DailyAttendanceDetail.query.filter(
DailyAttendanceDetail.weekly_record_id.in_(record_ids),
DailyAttendanceDetail.status == '请假'
).count()
statistics = {
'total_leave_days': leave_count
}
# 确保总是返回模板
return render_template('admin/attendance_management.html',
attendance_records=attendance_records,
pagination=pagination,
start_date=start_date,
end_date=end_date,
student_search=student_search,
sort_by=sort_by,
statistics=statistics)
@admin_bp.route('/upload/attendance', methods=['GET', 'POST'])
@admin_required
def upload_attendance():
"""上传考勤数据"""
if request.method == 'POST':
# 检查考勤记录文件
if 'attendance_file' not in request.files:
flash('请选择考勤记录文件', 'error')
return render_template('admin/upload_attendance.html')
attendance_file = request.files['attendance_file']
if attendance_file.filename == '':
flash('请选择考勤记录文件', 'error')
return render_template('admin/upload_attendance.html')
# 检查请假单文件(可选)
leave_file = request.files.get('leave_file')
has_leave_file = leave_file and leave_file.filename != ''
week_start = request.form.get('week_start')
week_end = request.form.get('week_end')
if not week_start or not week_end:
flash('请选择周开始和结束日期', 'error')
return render_template('admin/upload_attendance.html')
if attendance_file and attendance_file.filename.endswith(('.xlsx', '.xls')):
attendance_filename = secure_filename(attendance_file.filename)
leave_filename = secure_filename(leave_file.filename) if has_leave_file else None
# 使用临时文件
attendance_temp_file = None
leave_temp_file = None
try:
# 保存考勤记录文件
attendance_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx')
attendance_file.save(attendance_temp_file.name)
attendance_temp_file.close()
# 保存请假单文件(如果有)
if has_leave_file and leave_file.filename.endswith(('.xlsx', '.xls')):
leave_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx')
leave_file.save(leave_temp_file.name)
leave_temp_file.close()
# 处理数据
importer = AttendanceDataImporter()
# 解析考勤数据
attendance_data = importer.parse_xlsx_file(attendance_temp_file.name)
# 解析请假数据(如果有)
leave_data = None
if leave_temp_file:
try:
leave_data = importer.parse_leave_file(leave_temp_file.name)
flash(f'成功解析请假记录 {len(leave_data)}', 'info')
except Exception as e:
flash(f'请假单解析失败:{str(e)}', 'warning')
leave_data = None
# 应用请假数据到考勤数据
if leave_data:
attendance_data = importer.apply_leave_records(attendance_data, leave_data, week_start, week_end)
# 导入到数据库
success_count, error_count, error_messages = importer.import_to_database(
attendance_data, week_start, week_end)
# 如果有请假数据,同时保存到请假记录表
if leave_data:
leave_success_count = importer.import_leave_records_to_database(leave_data)
flash(f'请假记录导入:{leave_success_count}', 'info')
if success_count > 0:
message = f'导入完成:成功 {success_count} 条,失败 {error_count}'
if has_leave_file:
message += f',已处理请假记录'
flash(message, 'success')
if error_messages:
for msg in error_messages[:5]: # 只显示前5个错误
flash(msg, 'warning')
else:
flash('导入失败,请检查文件格式和数据', 'error')
for msg in error_messages[:3]:
flash(msg, 'error')
except Exception as e:
flash(f'文件处理失败:{str(e)}', 'error')
logger.error(f"文件处理失败: {str(e)}", exc_info=True)
finally:
# 删除临时文件
try:
if attendance_temp_file:
os.unlink(attendance_temp_file.name)
if leave_temp_file:
os.unlink(leave_temp_file.name)
except:
pass
return redirect(url_for('admin.attendance_management'))
else:
flash('请上传Excel文件(.xlsx或.xls)', 'error')
return render_template('admin/upload_attendance.html')
# 添加一个新的路由来删除考勤记录
@admin_bp.route('/attendance/<int:record_id>/delete', methods=['POST'])
@admin_required
def delete_attendance_record(record_id):
"""删除考勤记录"""
try:
record = WeeklyAttendance.query.get_or_404(record_id)
db.session.delete(record)
db.session.commit()
flash('考勤记录删除成功', 'success')
except Exception as e:
db.session.rollback()
flash(f'删除失败: {str(e)}', 'error')
return redirect(url_for('admin.attendance_management'))
@admin_bp.route('/statistics')
@admin_required
def statistics():
"""统计报表"""
from sqlalchemy import desc, func, case, or_, and_
from datetime import datetime, timedelta
from collections import OrderedDict
# 获取筛选参数
search = request.args.get('search', '').strip()
grade_filter = request.args.get('grade', '').strip()
college_filter = request.args.get('college', '').strip()
supervisor_filter = request.args.get('supervisor', '').strip()
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
# 构建基础查询
base_query = db.session.query(
Student.student_number,
Student.name,
Student.grade,
Student.college,
Student.supervisor,
Student.degree_type,
Student.enrollment_date,
func.coalesce(func.sum(WeeklyAttendance.actual_work_hours), 0).label('total_work_hours'),
func.coalesce(func.sum(WeeklyAttendance.class_work_hours), 0).label('total_class_hours'),
func.coalesce(func.sum(WeeklyAttendance.overtime_hours), 0).label('total_overtime_hours'),
func.coalesce(func.sum(WeeklyAttendance.absent_days), 0).label('total_absent_days'),
func.count(WeeklyAttendance.record_id).label('attendance_weeks'),
# 计算迟到次数
func.coalesce(
func.sum(
case(
(DailyAttendanceDetail.status.like('%迟到%'), 1),
else_=0
)
), 0
).label('total_late_count')
).outerjoin(
WeeklyAttendance, Student.student_number == WeeklyAttendance.student_number
).outerjoin(
DailyAttendanceDetail, WeeklyAttendance.record_id == DailyAttendanceDetail.weekly_record_id
)
# 应用日期筛选
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
base_query = base_query.filter(WeeklyAttendance.week_start_date >= start_date_obj)
except ValueError:
flash('开始日期格式错误', 'error')
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
base_query = base_query.filter(WeeklyAttendance.week_end_date <= end_date_obj)
except ValueError:
flash('结束日期格式错误', 'error')
# 应用学生筛选
if search:
base_query = base_query.filter(or_(
Student.name.contains(search),
Student.student_number.contains(search)
))
if grade_filter:
try:
grade_int = int(grade_filter)
base_query = base_query.filter(Student.grade == grade_int)
except ValueError:
pass
if college_filter:
base_query = base_query.filter(Student.college == college_filter)
if supervisor_filter:
base_query = base_query.filter(Student.supervisor == supervisor_filter)
# 按学生分组
base_query = base_query.group_by(Student.student_id)
# 获取学生统计数据
students_stats = base_query.all()
# 年级映射函数和排序权重
def get_grade_info(grade, degree_type):
"""返回年级标签和排序权重"""
if degree_type in ['学博', '专博']:
# 博士权重为0-99年级越高权重越小优先显示
label = f'博士{grade}年级'
sort_weight = 10 - grade # 博士4年级权重6博士3年级权重7以此类推
else:
# 硕士权重为100-199年级越高权重越小
if grade == 1:
label = '研一'
elif grade == 2:
label = '研二'
elif grade == 3:
label = '研三'
else:
label = f'{grade}'
sort_weight = 110 - grade # 研三权重107研二权重108研一权重109
return label, sort_weight
# 处理学生数据并收集到临时字典
temp_grade_groups = {}
all_students_data = []
for stat in students_stats:
grade_label, sort_weight = get_grade_info(stat.grade, stat.degree_type)
student_data = {
'student_number': stat.student_number,
'name': stat.name,
'grade': stat.grade,
'grade_label': grade_label,
'college': stat.college,
'supervisor': stat.supervisor,
'degree_type': stat.degree_type,
'enrollment_date': stat.enrollment_date,
'total_work_hours': float(stat.total_work_hours),
'total_class_hours': float(stat.total_class_hours),
'total_overtime_hours': float(stat.total_overtime_hours),
'total_absent_days': int(stat.total_absent_days),
'total_late_count': int(stat.total_late_count),
'attendance_weeks': int(stat.attendance_weeks),
'avg_weekly_hours': round(float(stat.total_work_hours) / max(stat.attendance_weeks, 1),
1) if stat.attendance_weeks > 0 else 0
}
all_students_data.append(student_data)
if grade_label not in temp_grade_groups:
temp_grade_groups[grade_label] = {
'students': [],
'sort_weight': sort_weight
}
temp_grade_groups[grade_label]['students'].append(student_data)
# 按出勤时长排序每个年级的学生
for grade in temp_grade_groups:
temp_grade_groups[grade]['students'].sort(key=lambda x: x['total_work_hours'], reverse=True)
# 🔥 按排序权重重新排序年级组,生成有序字典
sorted_grade_items = sorted(temp_grade_groups.items(), key=lambda x: x[1]['sort_weight'])
# 创建最终的有序年级组字典
grade_groups = OrderedDict()
for grade_label, grade_data in sorted_grade_items:
grade_groups[grade_label] = grade_data['students']
# 总体统计
overall_stats = {
'total_students': len(all_students_data),
'total_work_hours': sum(s['total_work_hours'] for s in all_students_data),
'total_absent_days': sum(s['total_absent_days'] for s in all_students_data),
'total_late_count': sum(s['total_late_count'] for s in all_students_data),
'avg_work_hours_per_student': round(
sum(s['total_work_hours'] for s in all_students_data) / max(len(all_students_data), 1), 1)
}
# 🔥 修正月度统计查询
monthly_query = db.session.query(
func.date_format(WeeklyAttendance.week_start_date, '%Y-%m').label('month'),
func.count(WeeklyAttendance.record_id).label('record_count'),
func.sum(WeeklyAttendance.actual_work_hours).label('total_hours'),
func.sum(WeeklyAttendance.absent_days).label('total_absent')
).group_by('month').order_by('month')
# 应用相同的筛选条件到月度统计
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
monthly_query = monthly_query.filter(WeeklyAttendance.week_start_date >= start_date_obj)
except ValueError:
pass
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
monthly_query = monthly_query.filter(WeeklyAttendance.week_end_date <= end_date_obj)
except ValueError:
pass
# 如果有学生筛选,需要关联学生表
if search or grade_filter or college_filter or supervisor_filter:
monthly_query = monthly_query.join(
Student, WeeklyAttendance.student_number == Student.student_number
)
if search:
monthly_query = monthly_query.filter(or_(
Student.name.contains(search),
Student.student_number.contains(search)
))
if grade_filter:
try:
grade_int = int(grade_filter)
monthly_query = monthly_query.filter(Student.grade == grade_int)
except ValueError:
pass
if college_filter:
monthly_query = monthly_query.filter(Student.college == college_filter)
if supervisor_filter:
monthly_query = monthly_query.filter(Student.supervisor == supervisor_filter)
monthly_stats = monthly_query.all()
# 🔥 修正按学院统计查询
college_query = db.session.query(
Student.college,
func.count(Student.student_id).label('student_count'),
func.coalesce(func.sum(WeeklyAttendance.actual_work_hours), 0).label('total_hours')
).outerjoin(WeeklyAttendance, Student.student_number == WeeklyAttendance.student_number)
# 应用筛选条件到学院统计
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
college_query = college_query.filter(
or_(WeeklyAttendance.week_start_date.is_(None),
WeeklyAttendance.week_start_date >= start_date_obj)
)
except ValueError:
pass
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
college_query = college_query.filter(
or_(WeeklyAttendance.week_end_date.is_(None),
WeeklyAttendance.week_end_date <= end_date_obj)
)
except ValueError:
pass
college_stats = college_query.group_by(Student.college).all()
# 获取筛选选项
colleges = db.session.query(Student.college).distinct().all()
colleges = [c[0] for c in colleges if c[0]]
supervisors = db.session.query(Student.supervisor).distinct().all()
supervisors = [s[0] for s in supervisors if s[0]]
grades = db.session.query(Student.grade).distinct().all()
grades = sorted([g[0] for g in grades if g[0]])
print("=== 调试信息 ===")
print(f"月度统计数据: {monthly_stats}")
print(f"学院统计数据: {college_stats}")
print(f"年级组排序: {list(grade_groups.keys())}")
print("===============")
return render_template('admin/statistics.html',
grade_groups=grade_groups,
all_students_data=all_students_data,
overall_stats=overall_stats,
monthly_stats=monthly_stats,
college_stats=college_stats,
colleges=colleges,
supervisors=supervisors,
grades=grades,
search=search,
selected_grade=grade_filter,
selected_college=college_filter,
selected_supervisor=supervisor_filter,
start_date=start_date,
end_date=end_date)
@admin_bp.route('/statistics/export')
@admin_required
def export_statistics():
"""导出统计数据"""
import pandas as pd
from io import BytesIO
# 获取所有学生统计数据(复用上面的查询逻辑)
students_query = db.session.query(
Student.student_number,
Student.name,
Student.grade,
Student.college,
Student.supervisor,
Student.degree_type,
func.coalesce(func.sum(WeeklyAttendance.actual_work_hours), 0).label('total_work_hours'),
func.coalesce(func.sum(WeeklyAttendance.class_work_hours), 0).label('total_class_hours'),
func.coalesce(func.sum(WeeklyAttendance.overtime_hours), 0).label('total_overtime_hours'),
func.coalesce(func.sum(WeeklyAttendance.absent_days), 0).label('total_absent_days'),
func.count(WeeklyAttendance.record_id).label('attendance_weeks')
).outerjoin(
WeeklyAttendance, Student.student_number == WeeklyAttendance.student_number
).group_by(Student.student_id).all()
# 转换为DataFrame
data = []
for stat in students_query:
grade_label = f'博士{stat.grade}年级' if stat.degree_type in ['学博', '专博'] else f'{stat.grade}'
data.append({
'学号': stat.student_number,
'姓名': stat.name,
'年级': grade_label,
'学院': stat.college,
'导师': stat.supervisor,
'学位类型': stat.degree_type,
'总出勤时长(小时)': float(stat.total_work_hours),
'班内工作时长(小时)': float(stat.total_class_hours),
'加班时长(小时)': float(stat.total_overtime_hours),
'缺勤天数': int(stat.total_absent_days),
'考勤周数': int(stat.attendance_weeks),
'周均工作时长': round(float(stat.total_work_hours) / max(stat.attendance_weeks, 1), 1)
})
df = pd.DataFrame(data)
# 创建Excel文件
output = BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='学生考勤统计', index=False)
output.seek(0)
filename = f"学生考勤统计_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
@admin_bp.route('/students/add', methods=['GET', 'POST'])
@admin_required
def add_student():
"""添加学生"""
if request.method == 'POST':
try:
data = request.get_json() if request.is_json else request.form
# 检查学号是否已存在
if Student.query.filter_by(student_number=data['student_number']).first():
if request.is_json:
return jsonify({'success': False, 'message': '学号已存在'})
else:
flash('学号已存在', 'error')
return render_template('admin/add_student.html')
# 创建用户账户
user = User(
student_number=data['student_number'],
password_hash=generate_password_hash(data.get('password', '123456')),
role='student'
)
success, error = safe_add_and_commit(user)
if not success:
if request.is_json:
return jsonify({'success': False, 'message': f'创建用户失败: {error}'})
else:
flash(f'创建用户失败: {error}', 'error')
return render_template('admin/add_student.html')
# 创建学生记录
student = Student(
student_number=data['student_number'],
name=data['name'],
gender=data['gender'],
grade=int(data['grade']),
phone=data.get('phone', ''),
supervisor=data.get('supervisor', ''),
college=data.get('college', ''),
major=data.get('major', ''),
degree_type=data.get('degree_type') if data.get('degree_type') else None,
status=data.get('status', '在读'),
enrollment_date=datetime.strptime(data['enrollment_date'], '%Y-%m-%d').date() if data.get(
'enrollment_date') else None
)
success, error = safe_add_and_commit(student)
if success:
if request.is_json:
return jsonify({'success': True, 'message': '学生添加成功'})
else:
flash('学生添加成功', 'success')
return redirect(url_for('admin.student_list'))
else:
if request.is_json:
return jsonify({'success': False, 'message': f'添加失败: {error}'})
else:
flash(f'添加失败: {error}', 'error')
except Exception as e:
if request.is_json:
return jsonify({'success': False, 'message': f'添加失败: {str(e)}'})
else:
flash(f'添加失败: {str(e)}', 'error')
return render_template('admin/add_student.html')
@admin_bp.route('/students/<string:student_number>/edit', methods=['GET', 'POST'])
@admin_required
def edit_student(student_number):
"""编辑学生信息"""
student = Student.query.filter_by(student_number=student_number).first_or_404()
if request.method == 'POST':
try:
data = request.get_json() if request.is_json else request.form
# 更新学生信息
student.name = data['name']
student.gender = data['gender']
student.grade = int(data['grade'])
student.phone = data.get('phone', '')
student.supervisor = data.get('supervisor', '')
student.college = data.get('college', '')
student.major = data.get('major', '')
student.degree_type = data.get('degree_type') if data.get('degree_type') else None
student.status = data.get('status', '在读')
if data.get('enrollment_date'):
student.enrollment_date = datetime.strptime(data['enrollment_date'], '%Y-%m-%d').date()
success, error = safe_commit()
if success:
if request.is_json:
return jsonify({'success': True, 'message': '学生信息更新成功'})
else:
flash('学生信息更新成功', 'success')
return redirect(url_for('admin.student_detail', student_number=student_number))
else:
if request.is_json:
return jsonify({'success': False, 'message': f'更新失败: {error}'})
else:
flash(f'更新失败: {error}', 'error')
except Exception as e:
if request.is_json:
return jsonify({'success': False, 'message': f'更新失败: {str(e)}'})
else:
flash(f'更新失败: {str(e)}', 'error')
# GET请求返回学生数据用于编辑
if request.is_json:
return jsonify({
'success': True,
'student': {
'student_number': student.student_number,
'name': student.name,
'gender': student.gender,
'grade': student.grade,
'phone': student.phone,
'supervisor': student.supervisor,
'college': student.college,
'major': student.major,
'degree_type': student.degree_type,
'status': student.status,
'enrollment_date': student.enrollment_date.strftime('%Y-%m-%d') if student.enrollment_date else ''
}
})
return render_template('admin/edit_student.html', student=student)
@admin_bp.route('/students/<string:student_number>/delete', methods=['POST'])
@admin_required
def delete_student(student_number):
"""删除学生"""
try:
student = Student.query.filter_by(student_number=student_number).first_or_404()
student_name = student.name
# 删除学生记录(用户记录会因为外键约束自动删除)
success, error = safe_delete_and_commit(student)
if success:
if request.is_json:
return jsonify({'success': True, 'message': f'学生 {student_name} 删除成功'})
else:
flash(f'学生 {student_name} 删除成功', 'success')
return redirect(url_for('admin.student_list'))
else:
if request.is_json:
return jsonify({'success': False, 'message': f'删除失败: {error}'})
else:
flash(f'删除失败: {error}', 'error')
except Exception as e:
if request.is_json:
return jsonify({'success': False, 'message': f'删除失败: {str(e)}'})
else:
flash(f'删除失败: {str(e)}', 'error')
return redirect(url_for('admin.student_list'))
@admin_bp.route('/students/batch_action', methods=['POST'])
@admin_required
def batch_action():
"""批量操作学生"""
try:
data = request.get_json()
action = data.get('action')
student_numbers = data.get('student_numbers', [])
if not student_numbers:
return jsonify({'success': False, 'message': '请选择要操作的学生'})
if action == 'delete':
# 批量删除
students = Student.query.filter(Student.student_number.in_(student_numbers)).all()
for student in students:
db.session.delete(student)
success, error = safe_commit()
if success:
return jsonify({'success': True, 'message': f'成功删除 {len(student_numbers)} 个学生'})
else:
return jsonify({'success': False, 'message': f'删除失败: {error}'})
elif action == 'graduate':
# 批量设为毕业
Student.query.filter(Student.student_number.in_(student_numbers)).update(
{'status': '毕业'}, synchronize_session=False
)
success, error = safe_commit()
if success:
return jsonify({'success': True, 'message': f'成功将 {len(student_numbers)} 个学生设为毕业状态'})
else:
return jsonify({'success': False, 'message': f'操作失败: {error}'})
else:
return jsonify({'success': False, 'message': '无效的操作'})
except Exception as e:
return jsonify({'success': False, 'message': f'操作失败: {str(e)}'})
@admin_bp.route('/students/<string:student_number>/reset_password', methods=['POST'])
@admin_required
def reset_student_password(student_number):
"""重置学生密码"""
try:
user = User.query.filter_by(student_number=student_number).first_or_404()
# 重置为默认密码
new_password = request.get_json().get('password', '123456') if request.is_json else '123456'
user.password_hash = generate_password_hash(new_password)
success, error = safe_commit()
if success:
if request.is_json:
return jsonify({'success': True, 'message': '密码重置成功'})
else:
flash('密码重置成功', 'success')
return redirect(url_for('admin.student_detail', student_number=student_number))
else:
if request.is_json:
return jsonify({'success': False, 'message': f'重置失败: {error}'})
else:
flash(f'重置失败: {error}', 'error')
except Exception as e:
if request.is_json:
return jsonify({'success': False, 'message': f'重置失败: {str(e)}'})
else:
flash(f'重置失败: {str(e)}', 'error')
return redirect(url_for('admin.student_detail', student_number=student_number))
@admin_bp.route('/students/<string:student_number>/toggle_status', methods=['POST'])
@admin_required
def toggle_student_status(student_number):
"""切换学生账户状态"""
try:
user = User.query.filter_by(student_number=student_number).first_or_404()
user.is_active = not user.is_active
success, error = safe_commit()
if success:
status_text = '启用' if user.is_active else '禁用'
if request.is_json:
return jsonify({'success': True, 'message': f'账户{status_text}成功'})
else:
flash(f'账户{status_text}成功', 'success')
else:
if request.is_json:
return jsonify({'success': False, 'message': f'操作失败: {error}'})
else:
flash(f'操作失败: {error}', 'error')
except Exception as e:
if request.is_json:
return jsonify({'success': False, 'message': f'操作失败: {str(e)}'})
else:
flash(f'操作失败: {str(e)}', 'error')
return redirect(url_for('admin.student_detail', student_number=student_number))
@admin_bp.route('/attendance/<int:record_id>/details')
@admin_required
def attendance_record_details(record_id):
"""查看考勤记录详情"""
from datetime import datetime, timedelta
import json
# 获取周考勤汇总记录
weekly_record = WeeklyAttendance.query.get_or_404(record_id)
# 获取学生信息
student = Student.query.filter_by(student_number=weekly_record.student_number).first()
# 获取该周的每日考勤明细
daily_details = DailyAttendanceDetail.query.filter_by(
weekly_record_id=record_id
).order_by(DailyAttendanceDetail.attendance_date).all()
# 处理每日详情,计算工作时长和解析详细信息
processed_daily_details = []
for detail in daily_details:
processed_detail = {
'detail_id': detail.detail_id,
'attendance_date': detail.attendance_date,
'status': detail.status,
'check_in_time': detail.check_in_time,
'check_out_time': detail.check_out_time,
'remarks': detail.remarks,
'duration_hours': None,
'detailed_info': None
}
# 计算工作时长
if detail.check_in_time and detail.check_out_time:
try:
# 创建完整的datetime对象
start_datetime = datetime.combine(detail.attendance_date, detail.check_in_time)
end_datetime = datetime.combine(detail.attendance_date, detail.check_out_time)
# 如果结束时间小于开始时间,说明跨天了
if end_datetime < start_datetime:
end_datetime += timedelta(days=1)
duration = (end_datetime - start_datetime).total_seconds() / 3600
processed_detail['duration_hours'] = round(duration, 1)
except Exception as e:
print(f"计算工作时长失败: {e}")
processed_detail['duration_hours'] = None
# 解析详细信息
if detail.remarks:
try:
if detail.remarks.startswith('{'):
remarks_data = json.loads(detail.remarks)
processed_detail['detailed_info'] = remarks_data.get('details')
processed_detail['summary_remarks'] = remarks_data.get('summary', detail.remarks)
else:
processed_detail['summary_remarks'] = detail.remarks
except:
processed_detail['summary_remarks'] = detail.remarks
processed_daily_details.append(processed_detail)
# 计算统计数据
total_days = len(processed_daily_details)
present_days = len([d for d in processed_daily_details if d['status'] == '正常'])
late_days = len([d for d in processed_daily_details if '迟到' in d['status']])
absent_days = len([d for d in processed_daily_details if d['status'] == '缺勤'])
# 计算平均每日工作时长
if processed_daily_details:
avg_daily_hours = weekly_record.actual_work_hours / max(present_days, 1)
else:
avg_daily_hours = 0
# 获取该学生的历史考勤记录(用于对比)
historical_records = WeeklyAttendance.query.filter_by(
student_number=weekly_record.student_number
).filter(WeeklyAttendance.record_id != record_id).order_by(
desc(WeeklyAttendance.week_start_date)
).limit(5).all()
return render_template('admin/attendance_details.html',
weekly_record=weekly_record,
student=student,
daily_details=processed_daily_details,
total_days=total_days,
present_days=present_days,
late_days=late_days,
absent_days=absent_days,
avg_daily_hours=avg_daily_hours,
historical_records=historical_records)
@admin_bp.route('/attendance/<int:record_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_attendance_record(record_id):
"""编辑考勤记录"""
weekly_record = WeeklyAttendance.query.get_or_404(record_id)
if request.method == 'POST':
try:
data = request.get_json() if request.is_json else request.form
# 更新周考勤记录
weekly_record.actual_work_hours = float(data.get('actual_work_hours', 0))
weekly_record.class_work_hours = float(data.get('class_work_hours', 0))
weekly_record.absent_days = int(data.get('absent_days', 0))
weekly_record.overtime_hours = float(data.get('overtime_hours', 0))
success, error = safe_commit()
if success:
flash('考勤记录更新成功', 'success')
return redirect(url_for('admin.attendance_record_details', record_id=record_id))
else:
flash(f'更新失败: {error}', 'error')
except Exception as e:
flash(f'更新失败: {str(e)}', 'error')
return render_template('admin/edit_attendance_record.html', weekly_record=weekly_record)
def export_attendance_data():
"""导出考勤数据到Excel"""
from sqlalchemy import desc, func, case, or_
from io import BytesIO
try:
# 获取筛选参数
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
student_search = request.args.get('student_search', '').strip()
sort_by = request.args.get('sort_by', 'week_start_date_desc')
# 构建查询
query = db.session.query(
WeeklyAttendance,
func.coalesce(
func.sum(
case(
(DailyAttendanceDetail.status.like('%迟到%'), 1),
else_=0
)
), 0
).label('late_count')
).outerjoin(
DailyAttendanceDetail,
WeeklyAttendance.record_id == DailyAttendanceDetail.weekly_record_id
).group_by(WeeklyAttendance.record_id)
# 应用筛选条件
if start_date:
try:
start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date()
query = query.filter(WeeklyAttendance.week_start_date >= start_date_obj)
except ValueError:
pass
if end_date:
try:
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date()
query = query.filter(WeeklyAttendance.week_end_date <= end_date_obj)
except ValueError:
pass
if student_search:
query = query.filter(or_(
WeeklyAttendance.name.contains(student_search),
WeeklyAttendance.student_number.contains(student_search)
))
# 应用排序
if sort_by and '_' in sort_by:
field, direction = sort_by.rsplit('_', 1)
if direction not in ['asc', 'desc']:
direction = 'desc'
if field == 'actual_work_hours':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.actual_work_hours))
else:
query = query.order_by(WeeklyAttendance.actual_work_hours)
elif field == 'week_start_date':
if direction == 'desc':
query = query.order_by(desc(WeeklyAttendance.week_start_date))
else:
query = query.order_by(WeeklyAttendance.week_start_date)
else:
query = query.order_by(desc(WeeklyAttendance.week_start_date))
else:
query = query.order_by(desc(WeeklyAttendance.week_start_date))
# 获取所有记录
results = query.all()
if not results:
flash('没有数据可导出', 'warning')
args = request.args.copy()
args.pop('export', None)
return redirect(url_for('admin.attendance_management', **args))
# 准备数据
data = []
for record, late_count in results:
data.append({
'学号': record.student_number,
'姓名': record.name,
'周开始日期': record.week_start_date.strftime('%Y-%m-%d'),
'周结束日期': record.week_end_date.strftime('%Y-%m-%d'),
'实际出勤时长(小时)': float(record.actual_work_hours),
'班内工作时长(小时)': float(record.class_work_hours),
'旷工天数': int(record.absent_days),
'迟到次数': int(late_count) if late_count else 0,
'加班时长(小时)': float(record.overtime_hours),
'记录创建时间': record.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
# 创建DataFrame
df = pd.DataFrame(data)
# 创建Excel文件
output = BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='考勤记录', index=False)
# 调整列宽
workbook = writer.book
worksheet = writer.sheets['考勤记录']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 30)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
# 生成文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"考勤记录_{timestamp}.xlsx"
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
flash(f'导出失败: {str(e)}', 'error')
# 移除export参数重定向到正常页面
args = request.args.copy()
args.pop('export', None)
return redirect(url_for('admin.attendance_management', **args))