2024-11-14 15:43:33 +08:00

924 lines
35 KiB
Python

from flask import Flask, request, jsonify, redirect, url_for, render_template, session, make_response, flash
from flask_mail import Mail, Message
from flask_bcrypt import Bcrypt
import mysql.connector
import boto3
from botocore.client import Config
import os
import logging
import random
import pandas as pd
from dotenv import load_dotenv
from flask import send_file
import io
from functools import wraps
import tempfile
import shutil
import zipfile
import time
from urllib.parse import quote
load_dotenv()
bcrypt = Bcrypt()
# 配置 AWS S3
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
region_name = os.getenv('AWS_REGION')
bucket_name = os.getenv('S3_BUCKET_NAME')
s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
config=Config(signature_version='s3v4')
)
# 下载相关
def download_and_zip_files(bucket_name, folder_key):
"""下载文件并进行支持 ZIP64 的压缩打包"""
temp_dir = tempfile.mkdtemp() # 创建一个临时目录
zip_file_path = os.path.join(temp_dir, 'homework.zip')
try:
# 列出文件
file_keys = list_files_in_folder(bucket_name, folder_key)
if not file_keys:
logging.error(f"No files found in folder: {folder_key}")
return None
# 下载文件
for file_key in file_keys:
file_name = os.path.basename(file_key)
file_download_path = os.path.join(temp_dir, file_name)
# 下载文件
logging.info(f"Downloading {file_key} to {file_download_path}")
s3_client.download_file(bucket_name, file_key, file_download_path)
# ZIP 压缩过程
with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zipf:
for root, _, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
# 排除自身的 ZIP 文件,防止自我压缩
if file == os.path.basename(zip_file_path):
logging.info(f"Skipping self zip file: {file}")
continue
# 打包其它文件
arcname = os.path.relpath(file_path, start=temp_dir)
logging.info(f"Adding {file} to archive as {arcname}")
zipf.write(file_path, arcname=arcname)
logging.info(f"Zip file created at {zip_file_path}")
return zip_file_path # 返回 ZIP 文件路径
except Exception as e:
logging.error(f"Error during download or zip operation: {e}")
return None
def upload_zip_to_s3(zip_file_path, bucket_name, object_key):
"""上传压缩包文件到S3"""
try:
s3_client.upload_file(zip_file_path, bucket_name, object_key)
except ClientError as e:
logging.error(f"Error uploading zip to S3: {e}")
return None
return generate_presigned_download_url(bucket_name, object_key)
# 列出文件
def list_files_in_folder(bucket_name, folder_key):
try:
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_key)
if 'Contents' not in response:
logging.error(f"No contents found in S3 folder {folder_key}")
return []
files = [obj['Key'] for obj in response['Contents'] if obj['Key'] != folder_key]
logging.info(f"Files in folder {folder_key}: {files}")
return files
except ClientError as e:
logging.error(f"Error listing files in folder {folder_key}: {e}")
return []
# 数据库连接函数
def get_db_connection():
return mysql.connector.connect(
host=os.getenv('MYSQL_HOST'),
user=os.getenv('MYSQL_USER'),
password=os.getenv('MYSQL_PASSWORD'),
database=os.getenv('MYSQL_DB')
)
# 验证学生身份
def validate_student(student_id, password, bcrypt):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM students WHERE id = %s', (student_id,))
student = cursor.fetchone()
cursor.close()
conn.close()
if student and bcrypt.check_password_hash(student['password'], password):
return student
return None
# 验证管理员身份
def validate_admin(username, password):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM administrators WHERE username = %s', (username,))
admin = cursor.fetchone()
cursor.close()
conn.close()
if admin and bcrypt.check_password_hash(admin['password'], password):
return admin
return None
# 验证教师身份
def validate_teacher(email, password, bcrypt):
conn = get_db_connection() # 或者其他方法来获取数据库连接
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM teachers WHERE email = %s', (email,))
teacher = cursor.fetchone()
cursor.close()
conn.close()
if teacher and bcrypt.check_password_hash(teacher['password'], password):
return teacher
return None
def fetch_all_departments():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM departments')
departments = cursor.fetchall()
cursor.close()
conn.close()
return departments
def fetch_all_grades():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM grades')
grades = cursor.fetchall()
cursor.close()
conn.close()
return grades
def fetch_all_classes():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM classes')
classes = cursor.fetchall()
cursor.close()
conn.close()
return classes
def fetch_all_teachers():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM teachers')
teachers = cursor.fetchall()
cursor.close()
conn.close()
return teachers
def fetch_teacher_classes(teacher_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# 假设每个教师和班级的关联存储在一个中间表 `class_teacher` 中
cursor.execute('''
SELECT c.id, c.name FROM classes c
JOIN class_teacher ct ON c.id = ct.class_id
WHERE ct.teacher_id = %s
''', (teacher_id,))
classes = cursor.fetchall()
cursor.close()
conn.close()
return classes
def fetch_class_assignments(class_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
pivot_query = '''
SELECT * FROM assignments
WHERE class_id = %s
'''
cursor.execute(pivot_query, (class_id,))
assignments = cursor.fetchall()
cursor.close()
conn.close()
return assignments
def fetch_class_students(class_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
# 查询属于给定class_id的所有学生
cursor.execute('''
SELECT * FROM students
WHERE class_id = %s
''', (class_id,))
students = cursor.fetchall()
cursor.close()
conn.close()
return students
# 添加到提交历史表
def add_to_submission_history(student_id, assignment_id, filename):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
'INSERT INTO submission_history (student_id, assignment_id, filename, submit_date) VALUES (%s, %s, %s, NOW())',
(student_id, assignment_id, filename)
)
conn.commit()
cursor.close()
conn.close()
# 添加或更新作业提交记录
def add_or_update_submission(student_id, assignment_id, filename, code_verified=False):
add_to_submission_history(student_id, assignment_id, filename) # 保留历史记录
conn = get_db_connection()
cursor = conn.cursor()
if code_verified:
cursor.execute(
'UPDATE submissions SET submit_date = NOW(), filename = %s WHERE student_id = %s AND assignment_id = %s',
(filename, student_id, assignment_id))
else:
cursor.execute(
'INSERT INTO submissions (student_id, assignment_id, filename, submit_date) VALUES (%s, %s, %s, NOW())',
(student_id, assignment_id, filename))
conn.commit()
cursor.close()
conn.close()
# 生成预签名URL
def generate_presigned_url(object_key, content_type, expiration=3600):
try:
url = s3_client.generate_presigned_url(
'put_object',
Params={'Bucket': bucket_name, 'Key': object_key, 'ContentType': content_type},
ExpiresIn=expiration
)
return url
except Exception as e:
logging.error(f"Failed to generate presigned URL: {str(e)}")
return None
# 添加管理员路由
def add_admin_routes(app, mail, bcrypt):
# 管理员登录装饰器
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'admin_id' not in session:
return redirect(url_for('admin_login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/admin/add_department', methods=['POST'])
@admin_required
def add_department():
name = request.form['name']
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO departments (name) VALUES (%s)', (name,))
conn.commit()
cursor.close()
conn.close()
flash('Department added successfully', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/add_grade', methods=['POST'])
@admin_required
def add_grade():
year = request.form['year']
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO grades (year) VALUES (%s)', (year,))
conn.commit()
cursor.close()
conn.close()
flash('Grade added successfully', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/add_class', methods=['POST'])
@admin_required
def add_class():
name = request.form['name']
department_id = request.form['department_id']
grade_id = request.form['grade_id']
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO classes (name, department_id, grade_id) VALUES (%s, %s, %s)',
(name, department_id, grade_id))
conn.commit()
cursor.close()
conn.close()
flash('Class added successfully', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/add_teacher', methods=['POST'])
@admin_required
def add_teacher():
name = request.form['name']
email = request.form['email']
password = bcrypt.generate_password_hash(request.form['password']).decode('utf-8')
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO teachers (name, email, password) VALUES (%s, %s, %s)',
(name, email, password))
conn.commit()
cursor.close()
conn.close()
flash('Teacher added successfully', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/add_administrator', methods=['POST'])
@admin_required
def add_administrator():
username = request.form['username']
password = bcrypt.generate_password_hash(request.form['password']).decode('utf-8')
teacher_id = request.form['teacher_id']
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO administrators (username, password, teacher_id) VALUES (%s, %s, %s)',
(username, password, teacher_id))
conn.commit()
cursor.close()
conn.close()
flash('Administrator added successfully', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/assign_teacher', methods=['POST'])
@admin_required
def assign_teacher():
class_id = request.form['class_id']
teacher_id = request.form['teacher_id']
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO class_teacher (class_id, teacher_id) VALUES (%s, %s)', (class_id, teacher_id))
conn.commit()
cursor.close()
conn.close()
flash('Teacher assigned to class successfully', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
admin = validate_admin(username, password)
if admin:
session['admin_id'] = admin['id']
return redirect(url_for('admin_panel'))
else:
flash('Invalid credentials', 'error')
return render_template('admin_login.html')
@app.route('/admin/logout')
def admin_logout():
session.pop('admin_id', None)
return redirect(url_for('admin_login'))
@app.route('/admin/panel')
@admin_required
def admin_panel():
# Retrieve necessary data (e.g., departments, grades, classes, etc.)
departments = fetch_all_departments()
grades = fetch_all_grades()
classes = fetch_all_classes()
teachers = fetch_all_teachers()
return render_template('admin_panel.html', departments=departments, grades=grades, classes=classes, teachers=teachers)
@app.route('/admin/add_assignment', methods=['POST'])
@admin_required
def admin_add_assignment():
value = request.form['value']
name = request.form['name']
deadline = request.form['deadline']
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO assignments (value, name, deadline) VALUES (%s, %s, %s)', (value, name, deadline))
conn.commit()
cursor.close()
conn.close()
flash('Assignment added successfully', 'success')
return redirect(url_for('admin_panel'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
student_id = request.form.get('student_id')
password = request.form.get('password')
student = validate_student(student_id, password, bcrypt)
if student:
session['student_id'] = student['id']
session['student_name'] = student['name']
return redirect(url_for('serve_index'))
else:
return render_template('login.html', error='学号或密码错误')
return render_template('login.html')
@app.route('/reset-password', methods=['GET', 'POST'])
def reset_password():
if request.method == 'POST':
student_id = request.form.get('student_id')
email = request.form.get('email')
code = request.form.get('code')
new_password = request.form.get('new_password')
if code:
if session.get('reset_code') == code and session.get('reset_student_id') == student_id:
conn = get_db_connection()
cursor = conn.cursor()
hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8')
cursor.execute('UPDATE students SET password = %s WHERE id = %s', (hashed_password, student_id))
conn.commit()
cursor.close()
conn.close()
return render_template('login.html', success='密码已成功重置,请使用新密码登录')
else:
return render_template('reset_password.html', error='验证码错误')
else:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM students WHERE id = %s AND email = %s', (student_id, email))
student = cursor.fetchone()
cursor.close()
conn.close()
if student:
reset_code = ''.join(random.choices('0123456789', k=6))
session['reset_code'] = reset_code
session['reset_student_id'] = student_id
try:
msg = Message('重置密码验证码', recipients=[email])
msg.body = f'您用于重置密码的验证码是: {reset_code}'
mail.send(msg)
return render_template('reset_password.html',
success='验证码已发送到您的邮箱,请检查并输入验证码')
except Exception as e:
logging.error(f"Error sending email: {str(e)}")
return render_template('reset_password.html', error='发送验证码失败,请稍后再试')
else:
return render_template('reset_password.html', error='学号和邮箱不匹配')
return render_template('reset_password.html')
@app.route('/record-submission', methods=['POST'])
def record_submission():
data = request.json
student_id = session.get('student_id')
student_name = session.get('student_name')
assignment = data.get('assignment')
filename = data.get('filename') # 通过前端传递的文件名获取
if not student_id or not filename or not assignment:
return jsonify({'error': '学号、作业和文件名是必要参数'}), 400
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM submissions WHERE student_id = %s AND assignment_id = %s',
(student_id, assignment))
submission = cursor.fetchone() # 检查该学生是否已提交过此作业
cursor.close()
conn.close()
if submission:
session['filename'] = filename # 保留新文件名以备`validate-code`使用
return jsonify({'error': '作业已提交过,请输入验证码继续'}), 401 # 提示用户输入验证码以覆盖提交
# 提取当前上传文件的扩展名
_, file_extension = os.path.splitext(filename) # 通过前端传递的文件提取扩展名
# 生成新的文件名,并确保文件扩展名保持正确
new_filename = f'{student_id}_{student_name}_{assignment}{file_extension}'
folder_name = f'sure_homework_define_by_qin/{assignment}'
object_key = f'{folder_name}/{new_filename}'
# 生成预签名 URL
url = generate_presigned_url(object_key, 'application/octet-stream')
if not url:
logging.error("Failed to generate presigned URL")
return jsonify({'error': '无法生成预签名 URL'}), 500
# 保存提交记录
add_or_update_submission(student_id, assignment, new_filename, code_verified=False)
return jsonify({'status': 'success', 'upload_url': url})
@app.route('/generate-code', methods=['POST'])
def generate_code():
student_id = session.get('student_id')
assignment = request.json.get('assignment')
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT email FROM students WHERE id = %s', (student_id,))
student = cursor.fetchone()
cursor.close()
conn.close()
if not student:
return jsonify({'error': '学生信息未找到'}), 404
email = student['email']
reset_code = ''.join(random.choices('0123456789', k=6))
session['submission_code'] = reset_code
session['submission_student_id'] = student_id
session['submission_assignment'] = assignment
try:
msg = Message('提交作业验证码', recipients=[email])
msg.body = f'您用于提交作业的验证码是: {reset_code}'
mail.send(msg)
return jsonify({'status': '验证码已发送到您的邮箱,请检查并输入验证码'})
except Exception as e:
logging.error(f"Error sending email: {str(e)}")
return jsonify({'error': '发送验证码失败,请稍后再试'}), 500
@app.route('/validate-code', methods=['POST'])
def validate_code():
request_data = request.json
student_id = session.get('student_id')
assignment = request_data.get('assignment')
code = request_data.get('code')
# 验证提交的验证码与 session 中的值是否匹配
if (code == session.get('submission_code') and
student_id == session.get('submission_student_id') and
assignment == session.get('submission_assignment')):
# 从 session 中获取新文件名
filename = session.pop('filename', None)
if not filename:
return jsonify({'error': 'No file was found in session'}), 400
# 提取文件扩展名
_, file_extension = os.path.splitext(filename) # 获取文件的后缀名
# 使用学号、学生名、作业名生成文件名(不含后缀)
new_filename = f'{student_id}_{session.get("student_name")}_{assignment}{file_extension}'
folder_name = f'sure_homework_define_by_qin/{assignment}'
# 生成对象键前缀(不含扩展名),用于查找已存在的文件
object_key_prefix = f'{folder_name}/{student_id}_{session.get("student_name")}_{assignment}'
# 删除已存在的文件(任何扩展名)
delete_old_files_in_s3(object_key_prefix)
# 再生成新的对象键(含有扩展名)
object_key = f'{object_key_prefix}{file_extension}'
logging.info(f"Generated object_key: {object_key}")
# 生成预签名 URL
url = generate_presigned_url(object_key, 'application/octet-stream')
if not url:
logging.error("Failed to generate presigned URL")
return jsonify({'error': 'Failed to generate presigned URL'}), 500
# 更新数据库记录
add_or_update_submission(student_id, assignment, new_filename, code_verified=True)
# 清除 session 中的验证码和作业信息
session.pop('submission_code', None)
session.pop('submission_student_id', None)
session.pop('submission_assignment', None)
session['validated_assignment'] = assignment
session['validation_presigned_url'] = url
return jsonify({'status': '成功', 'upload_url': url})
else:
return jsonify({'error': '验证码错误'}), 400
# 辅助函数:删除 S3 存储桶中具有指定前缀的旧文件(无论扩展名是何)
def delete_old_files_in_s3(object_key_prefix):
# 查找存储桶中是否已经存在具有相同前缀(不同后缀)的文件
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=object_key_prefix)
if 'Contents' in response:
for obj in response['Contents']:
s3_client.delete_object(Bucket=bucket_name, Key=obj['Key'])
logging.info(f"Deleted old file from S3: {obj['Key']}")
@app.route('/api/submissions')
def get_submissions():
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
sh.submit_date AS 时间,
sh.filename AS 提交的文件,
s.name AS 姓名,
s.id AS 学号,
sh.assignment_id AS 作业
FROM submission_history sh
JOIN students s ON sh.student_id = s.id
ORDER BY sh.submit_date DESC
""")
submissions = cursor.fetchall()
cursor.close()
conn.close()
# 格式化日期时间
for sub in submissions:
sub['时间'] = sub['时间'].strftime('%Y-%m-%d %H:%M:%S') if sub['时间'] else None
return jsonify(submissions)
except Exception as e:
logging.error(f"Error in get_submissions: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/api/assignment-status')
def get_assignment_status():
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
s.id AS 学号,
s.name AS 姓名,
a.assignment_id AS 作业,
CASE WHEN sub.id IS NOT NULL THEN '已提交' ELSE '未提交' END AS 提交情况,
COALESCE(sub.filename, '') AS 提交的文件
FROM students s
CROSS JOIN (SELECT DISTINCT assignment_id FROM submissions) a
LEFT JOIN submissions sub ON s.id = sub.student_id AND a.assignment_id = sub.assignment_id
ORDER BY s.id, a.assignment_id
""")
status_data = cursor.fetchall()
cursor.close()
conn.close()
return jsonify(status_data)
except Exception as e:
logging.error(f"Error in get_assignment_status: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/download-submissions')
def download_submissions():
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
sh.submit_date AS 时间,
sh.filename AS 提交的项目,
s.name AS 姓名,
s.id AS 学号,
sh.assignment_id AS 作业
FROM submission_history sh
JOIN students s ON sh.student_id = s.id
ORDER BY sh.submit_date DESC
""")
submissions = cursor.fetchall()
cursor.close()
conn.close()
# 创建 DataFrame
df = pd.DataFrame(submissions)
# 格式化日期时间
df['时间'] = pd.to_datetime(df['时间']).dt.strftime('%Y-%m-%d %H:%M:%S')
# 确保列的顺序
df = df[['时间', '提交的项目', '姓名', '学号', '作业']]
# 创建一个 BytesIO 对象,用于存储 Excel 文件
output = io.BytesIO()
# 使用 ExcelWriter 来设置列宽
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df.to_excel(writer, sheet_name='提交记录', index=False)
worksheet = writer.sheets['提交记录']
# 设置列宽
worksheet.set_column('A:A', 20) # 时间
worksheet.set_column('B:B', 30) # 提交的项目
worksheet.set_column('C:C', 15) # 姓名
worksheet.set_column('D:D', 15) # 学号
worksheet.set_column('E:E', 20) # 作业
output.seek(0)
return send_file(
output,
as_attachment=True,
download_name='submissions.xlsx',
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
logging.error(f"Error in download_submissions: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/download-assignment-status')
def download_assignment_status():
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
s.id AS 学号,
s.name AS 姓名,
COALESCE(sub.assignment_id, 'No assignment') AS 作业,
CASE WHEN sub.id IS NOT NULL THEN '已提交' ELSE '未提交' END AS 作业提交情况,
COALESCE(sub.filename, '') AS 提交的项目,
COALESCE(sub.submit_date, '') AS 提交时间
FROM students s
LEFT JOIN submissions sub ON s.id = sub.student_id
ORDER BY s.id, sub.assignment_id
""")
status_data = cursor.fetchall()
cursor.close()
conn.close()
df = pd.DataFrame(status_data)
# 格式化日期时间
df['提交时间'] = pd.to_datetime(df['提交时间']).dt.strftime('%Y-%m-%d %H:%M:%S')
# 确保列的顺序
df = df[['学号', '姓名', '作业', '作业提交情况', '提交的项目', '提交时间']]
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df.to_excel(writer, sheet_name='作业提交情况', index=False)
worksheet = writer.sheets['作业提交情况']
# 设置列宽
worksheet.set_column('A:A', 15) # 学号
worksheet.set_column('B:B', 10) # 姓名
worksheet.set_column('C:C', 15) # 作业
worksheet.set_column('D:D', 12) # 作业提交情况
worksheet.set_column('E:E', 30) # 提交的项目
worksheet.set_column('F:F', 20) # 提交时间
output.seek(0)
return send_file(
output,
as_attachment=True,
download_name='assignment_status.xlsx',
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
logging.error(f"Error in download_assignment_status: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/')
def serve_index():
if 'student_id' not in session:
return redirect(url_for('login'))
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM assignments ORDER BY deadline')
assignments = cursor.fetchall()
cursor.close()
conn.close()
return render_template('index.html', assignments=assignments)
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
def _build_cors_preflight_response():
response = make_response()
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
response.headers.add("Access-Control-Allow-Headers", "Content-Type")
return response
# 教师管理路由
def add_teacher_routes(app, mail, bcrypt):
# 教师登录装饰器
def teacher_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'teacher_id' not in session:
return redirect(url_for('teacher_login'))
return f(*args, **kwargs)
return decorated_function
# Teacher Login Route
@app.route('/teacher/login', methods=['GET', 'POST'])
def teacher_login():
if request.method == 'POST':
email = request.form['email']
password = request.form['password']
teacher = validate_teacher(email, password, bcrypt)
if teacher:
session['teacher_id'] = teacher['id']
return redirect(url_for('teacher_panel'))
else:
flash('Invalid credentials', 'error')
return render_template('teacher_login.html')
@app.route('/teacher/logout')
def teacher_logout():
session.pop('teacher_id', None)
return redirect(url_for('teacher_login'))
@app.route('/teacher/panel')
@teacher_required
def teacher_panel():
# Fetch the classes this teacher is responsible for
teacher_id = session['teacher_id']
classes = fetch_teacher_classes(teacher_id)
return render_template('teacher_panel.html', classes=classes)
@app.route('/teacher/download-assignment/<assignment_value>', methods=['GET'])
def download_assignment(assignment_value):
"""生成 ZIP 并直接发送给用户"""
teacher_id = session.get('teacher_id')
if not teacher_id:
return redirect(url_for('teacher_login'))
# 构造 S3 文件夹路径
folder_key = f"sure_homework_define_by_qin/{assignment_value}/"
# 下载并压缩作业文件夹
zip_file_path = download_and_zip_files(os.getenv('S3_BUCKET_NAME'), folder_key)
if not zip_file_path or not os.path.exists(zip_file_path):
logging.error(f"Failed to create ZIP file at {zip_file_path}")
return jsonify({"error": "无法创建 zip 文件"}), 500
try:
# 将来自 assignment_value 的文件名进行 URL 编码,以支持中文和特殊字符
filename = quote(f"{assignment_value}.zip")
logging.info(f"Sending file: {zip_file_path} as {filename}")
return send_file(zip_file_path, as_attachment=True, download_name=filename, mimetype='application/zip')
except Exception as ex:
logging.error(f"Error sending file: {str(ex)}")
return jsonify({"error": "Error sending file", "details": str(ex)}), 500
@app.route('/teacher/class/<int:class_id>')
@teacher_required
def view_class(class_id):
# Fetch the assignments and students of the class
assignments = fetch_class_assignments(class_id)
students = fetch_class_students(class_id)
return render_template('class_detail.html', assignments=assignments, students=students, class_id=class_id)
@app.route('/teacher/class/<int:class_id>/add_assignment', methods=['POST'])
@teacher_required
def teacher_add_assignment(class_id):
# Adding a new assignment to the class
value = request.form['value']
name = request.form['name']
deadline = request.form['deadline']
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('INSERT INTO assignments (value, name, deadline, class_id) VALUES (%s, %s, %s, %s)', (value, name, deadline, class_id))
conn.commit()
cursor.close()
conn.close()
flash('Assignment added successfully', 'success')
return redirect(url_for('view_class', class_id=class_id))
@app.route('/teacher/edit_assignment/<int:assignment_id>', methods=['POST'])
def edit_assignment(assignment_id):
new_deadline = request.json.get('deadline')
if new_deadline:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('UPDATE assignments SET deadline = %s WHERE id = %s', (new_deadline, assignment_id))
conn.commit()
cursor.close()
conn.close()
return jsonify({'status': 'success'})
return jsonify({'error': 'Invalid deadline'}), 400
@app.route('/teacher/delete_assignment/<int:assignment_id>', methods=['DELETE'])
def delete_assignment(assignment_id):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM assignments WHERE id = %s', (assignment_id,))
conn.commit()
cursor.close()
conn.close()
return jsonify({'status': 'success'})