from flask import Flask, request, jsonify, redirect, url_for, render_template, session, make_response, flash from flask_mail import Mail, Message from flask_bcrypt import Bcrypt import mysql.connector import boto3 from botocore.client import Config import os import logging import random import pandas as pd from dotenv import load_dotenv from flask import send_file import io from functools import wraps import tempfile import shutil import zipfile import time from urllib.parse import quote load_dotenv() bcrypt = Bcrypt() # 配置 AWS S3 aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID') aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY') region_name = os.getenv('AWS_REGION') bucket_name = os.getenv('S3_BUCKET_NAME') s3_client = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name, config=Config(signature_version='s3v4') ) # 下载相关 def download_and_zip_files(bucket_name, folder_key): """下载文件并进行支持 ZIP64 的压缩打包""" temp_dir = tempfile.mkdtemp() # 创建一个临时目录 zip_file_path = os.path.join(temp_dir, 'homework.zip') try: # 列出文件 file_keys = list_files_in_folder(bucket_name, folder_key) if not file_keys: logging.error(f"No files found in folder: {folder_key}") return None # 下载文件 for file_key in file_keys: file_name = os.path.basename(file_key) file_download_path = os.path.join(temp_dir, file_name) # 下载文件 logging.info(f"Downloading {file_key} to {file_download_path}") s3_client.download_file(bucket_name, file_key, file_download_path) # ZIP 压缩过程 with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zipf: for root, _, files in os.walk(temp_dir): for file in files: file_path = os.path.join(root, file) # 排除自身的 ZIP 文件,防止自我压缩 if file == os.path.basename(zip_file_path): logging.info(f"Skipping self zip file: {file}") continue # 打包其它文件 arcname = os.path.relpath(file_path, start=temp_dir) logging.info(f"Adding {file} to archive as {arcname}") zipf.write(file_path, arcname=arcname) logging.info(f"Zip file created at {zip_file_path}") return zip_file_path # 返回 ZIP 文件路径 except Exception as e: logging.error(f"Error during download or zip operation: {e}") return None def upload_zip_to_s3(zip_file_path, bucket_name, object_key): """上传压缩包文件到S3""" try: s3_client.upload_file(zip_file_path, bucket_name, object_key) except ClientError as e: logging.error(f"Error uploading zip to S3: {e}") return None return generate_presigned_download_url(bucket_name, object_key) # 列出文件 def list_files_in_folder(bucket_name, folder_key): try: response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_key) if 'Contents' not in response: logging.error(f"No contents found in S3 folder {folder_key}") return [] files = [obj['Key'] for obj in response['Contents'] if obj['Key'] != folder_key] logging.info(f"Files in folder {folder_key}: {files}") return files except ClientError as e: logging.error(f"Error listing files in folder {folder_key}: {e}") return [] # 数据库连接函数 def get_db_connection(): return mysql.connector.connect( host=os.getenv('MYSQL_HOST'), user=os.getenv('MYSQL_USER'), password=os.getenv('MYSQL_PASSWORD'), database=os.getenv('MYSQL_DB') ) # 验证学生身份 def validate_student(student_id, password, bcrypt): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM students WHERE id = %s', (student_id,)) student = cursor.fetchone() cursor.close() conn.close() if student and bcrypt.check_password_hash(student['password'], password): return student return None # 验证管理员身份 def validate_admin(username, password): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM administrators WHERE username = %s', (username,)) admin = cursor.fetchone() cursor.close() conn.close() if admin and bcrypt.check_password_hash(admin['password'], password): return admin return None # 验证教师身份 def validate_teacher(email, password, bcrypt): conn = get_db_connection() # 或者其他方法来获取数据库连接 cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM teachers WHERE email = %s', (email,)) teacher = cursor.fetchone() cursor.close() conn.close() if teacher and bcrypt.check_password_hash(teacher['password'], password): return teacher return None def fetch_all_departments(): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM departments') departments = cursor.fetchall() cursor.close() conn.close() return departments def fetch_all_grades(): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM grades') grades = cursor.fetchall() cursor.close() conn.close() return grades def fetch_all_classes(): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM classes') classes = cursor.fetchall() cursor.close() conn.close() return classes def fetch_all_teachers(): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM teachers') teachers = cursor.fetchall() cursor.close() conn.close() return teachers def fetch_teacher_classes(teacher_id): conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 假设每个教师和班级的关联存储在一个中间表 `class_teacher` 中 cursor.execute(''' SELECT c.id, c.name FROM classes c JOIN class_teacher ct ON c.id = ct.class_id WHERE ct.teacher_id = %s ''', (teacher_id,)) classes = cursor.fetchall() cursor.close() conn.close() return classes def fetch_class_assignments(class_id): conn = get_db_connection() cursor = conn.cursor(dictionary=True) pivot_query = ''' SELECT * FROM assignments WHERE class_id = %s ''' cursor.execute(pivot_query, (class_id,)) assignments = cursor.fetchall() cursor.close() conn.close() return assignments def fetch_class_students(class_id): conn = get_db_connection() cursor = conn.cursor(dictionary=True) # 查询属于给定class_id的所有学生 cursor.execute(''' SELECT * FROM students WHERE class_id = %s ''', (class_id,)) students = cursor.fetchall() cursor.close() conn.close() return students # 添加到提交历史表 def add_to_submission_history(student_id, assignment_id, filename): conn = get_db_connection() cursor = conn.cursor() cursor.execute( 'INSERT INTO submission_history (student_id, assignment_id, filename, submit_date) VALUES (%s, %s, %s, NOW())', (student_id, assignment_id, filename) ) conn.commit() cursor.close() conn.close() # 添加或更新作业提交记录 def add_or_update_submission(student_id, assignment_id, filename, code_verified=False): add_to_submission_history(student_id, assignment_id, filename) # 保留历史记录 conn = get_db_connection() cursor = conn.cursor() if code_verified: cursor.execute( 'UPDATE submissions SET submit_date = NOW(), filename = %s WHERE student_id = %s AND assignment_id = %s', (filename, student_id, assignment_id)) else: cursor.execute( 'INSERT INTO submissions (student_id, assignment_id, filename, submit_date) VALUES (%s, %s, %s, NOW())', (student_id, assignment_id, filename)) conn.commit() cursor.close() conn.close() # 生成预签名URL def generate_presigned_url(object_key, content_type, expiration=3600): try: url = s3_client.generate_presigned_url( 'put_object', Params={'Bucket': bucket_name, 'Key': object_key, 'ContentType': content_type}, ExpiresIn=expiration ) return url except Exception as e: logging.error(f"Failed to generate presigned URL: {str(e)}") return None # 添加管理员路由 def add_admin_routes(app, mail, bcrypt): # 管理员登录装饰器 def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'admin_id' not in session: return redirect(url_for('admin_login')) return f(*args, **kwargs) return decorated_function @app.route('/admin/add_department', methods=['POST']) @admin_required def add_department(): name = request.form['name'] conn = get_db_connection() cursor = conn.cursor() cursor.execute('INSERT INTO departments (name) VALUES (%s)', (name,)) conn.commit() cursor.close() conn.close() flash('Department added successfully', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/add_grade', methods=['POST']) @admin_required def add_grade(): year = request.form['year'] conn = get_db_connection() cursor = conn.cursor() cursor.execute('INSERT INTO grades (year) VALUES (%s)', (year,)) conn.commit() cursor.close() conn.close() flash('Grade added successfully', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/add_class', methods=['POST']) @admin_required def add_class(): name = request.form['name'] department_id = request.form['department_id'] grade_id = request.form['grade_id'] conn = get_db_connection() cursor = conn.cursor() cursor.execute('INSERT INTO classes (name, department_id, grade_id) VALUES (%s, %s, %s)', (name, department_id, grade_id)) conn.commit() cursor.close() conn.close() flash('Class added successfully', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/add_teacher', methods=['POST']) @admin_required def add_teacher(): name = request.form['name'] email = request.form['email'] password = bcrypt.generate_password_hash(request.form['password']).decode('utf-8') conn = get_db_connection() cursor = conn.cursor() cursor.execute('INSERT INTO teachers (name, email, password) VALUES (%s, %s, %s)', (name, email, password)) conn.commit() cursor.close() conn.close() flash('Teacher added successfully', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/add_administrator', methods=['POST']) @admin_required def add_administrator(): username = request.form['username'] password = bcrypt.generate_password_hash(request.form['password']).decode('utf-8') teacher_id = request.form['teacher_id'] conn = get_db_connection() cursor = conn.cursor() cursor.execute('INSERT INTO administrators (username, password, teacher_id) VALUES (%s, %s, %s)', (username, password, teacher_id)) conn.commit() cursor.close() conn.close() flash('Administrator added successfully', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/assign_teacher', methods=['POST']) @admin_required def assign_teacher(): class_id = request.form['class_id'] teacher_id = request.form['teacher_id'] conn = get_db_connection() cursor = conn.cursor() cursor.execute('INSERT INTO class_teacher (class_id, teacher_id) VALUES (%s, %s)', (class_id, teacher_id)) conn.commit() cursor.close() conn.close() flash('Teacher assigned to class successfully', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/login', methods=['GET', 'POST']) def admin_login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] admin = validate_admin(username, password) if admin: session['admin_id'] = admin['id'] return redirect(url_for('admin_panel')) else: flash('Invalid credentials', 'error') return render_template('admin_login.html') @app.route('/admin/logout') def admin_logout(): session.pop('admin_id', None) return redirect(url_for('admin_login')) @app.route('/admin/panel') @admin_required def admin_panel(): # Retrieve necessary data (e.g., departments, grades, classes, etc.) departments = fetch_all_departments() grades = fetch_all_grades() classes = fetch_all_classes() teachers = fetch_all_teachers() return render_template('admin_panel.html', departments=departments, grades=grades, classes=classes, teachers=teachers) @app.route('/admin/add_assignment', methods=['POST']) @admin_required def admin_add_assignment(): value = request.form['value'] name = request.form['name'] deadline = request.form['deadline'] conn = get_db_connection() cursor = conn.cursor() cursor.execute('INSERT INTO assignments (value, name, deadline) VALUES (%s, %s, %s)', (value, name, deadline)) conn.commit() cursor.close() conn.close() flash('Assignment added successfully', 'success') return redirect(url_for('admin_panel')) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': student_id = request.form.get('student_id') password = request.form.get('password') student = validate_student(student_id, password, bcrypt) if student: session['student_id'] = student['id'] session['student_name'] = student['name'] return redirect(url_for('serve_index')) else: return render_template('login.html', error='学号或密码错误') return render_template('login.html') @app.route('/reset-password', methods=['GET', 'POST']) def reset_password(): if request.method == 'POST': student_id = request.form.get('student_id') email = request.form.get('email') code = request.form.get('code') new_password = request.form.get('new_password') if code: if session.get('reset_code') == code and session.get('reset_student_id') == student_id: conn = get_db_connection() cursor = conn.cursor() hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8') cursor.execute('UPDATE students SET password = %s WHERE id = %s', (hashed_password, student_id)) conn.commit() cursor.close() conn.close() return render_template('login.html', success='密码已成功重置,请使用新密码登录') else: return render_template('reset_password.html', error='验证码错误') else: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM students WHERE id = %s AND email = %s', (student_id, email)) student = cursor.fetchone() cursor.close() conn.close() if student: reset_code = ''.join(random.choices('0123456789', k=6)) session['reset_code'] = reset_code session['reset_student_id'] = student_id try: msg = Message('重置密码验证码', recipients=[email]) msg.body = f'您用于重置密码的验证码是: {reset_code}' mail.send(msg) return render_template('reset_password.html', success='验证码已发送到您的邮箱,请检查并输入验证码') except Exception as e: logging.error(f"Error sending email: {str(e)}") return render_template('reset_password.html', error='发送验证码失败,请稍后再试') else: return render_template('reset_password.html', error='学号和邮箱不匹配') return render_template('reset_password.html') @app.route('/record-submission', methods=['POST']) def record_submission(): data = request.json student_id = session.get('student_id') student_name = session.get('student_name') assignment = data.get('assignment') filename = data.get('filename') # 通过前端传递的文件名获取 if not student_id or not filename or not assignment: return jsonify({'error': '学号、作业和文件名是必要参数'}), 400 conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM submissions WHERE student_id = %s AND assignment_id = %s', (student_id, assignment)) submission = cursor.fetchone() # 检查该学生是否已提交过此作业 cursor.close() conn.close() if submission: session['filename'] = filename # 保留新文件名以备`validate-code`使用 return jsonify({'error': '作业已提交过,请输入验证码继续'}), 401 # 提示用户输入验证码以覆盖提交 # 提取当前上传文件的扩展名 _, file_extension = os.path.splitext(filename) # 通过前端传递的文件提取扩展名 # 生成新的文件名,并确保文件扩展名保持正确 # 提取学号的最后两位 student_last_two_digits = student_id[-2:] # 使用学号最后两位、学生名、作业名生成文件名 new_filename = f'{student_last_two_digits}_{student_name}_{assignment}{file_extension}' folder_name = f'sure_homework_define_by_qin/{assignment}' object_key = f'{folder_name}/{new_filename}' # 生成预签名 URL url = generate_presigned_url(object_key, 'application/octet-stream') if not url: logging.error("Failed to generate presigned URL") return jsonify({'error': '无法生成预签名 URL'}), 500 # 保存提交记录 add_or_update_submission(student_id, assignment, new_filename, code_verified=False) return jsonify({'status': 'success', 'upload_url': url}) @app.route('/generate-code', methods=['POST']) def generate_code(): student_id = session.get('student_id') assignment = request.json.get('assignment') conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT email FROM students WHERE id = %s', (student_id,)) student = cursor.fetchone() cursor.close() conn.close() if not student: return jsonify({'error': '学生信息未找到'}), 404 email = student['email'] reset_code = ''.join(random.choices('0123456789', k=6)) session['submission_code'] = reset_code session['submission_student_id'] = student_id session['submission_assignment'] = assignment try: msg = Message('提交作业验证码', recipients=[email]) msg.body = f'您用于提交作业的验证码是: {reset_code}' mail.send(msg) return jsonify({'status': '验证码已发送到您的邮箱,请检查并输入验证码'}) except Exception as e: logging.error(f"Error sending email: {str(e)}") return jsonify({'error': '发送验证码失败,请稍后再试'}), 500 @app.route('/validate-code', methods=['POST']) def validate_code(): request_data = request.json student_id = session.get('student_id') assignment = request_data.get('assignment') code = request_data.get('code') # 验证提交的验证码与 session 中的值是否匹配 if (code == session.get('submission_code') and student_id == session.get('submission_student_id') and assignment == session.get('submission_assignment')): # 从 session 中获取新文件名 filename = session.pop('filename', None) if not filename: return jsonify({'error': 'No file was found in session'}), 400 # 提取文件扩展名 _, file_extension = os.path.splitext(filename) # 获取文件的后缀名 # 使用学号、学生名、作业名生成文件名(不含后缀) # 提取学号的最后两位 student_last_two_digits = student_id[-2:] # 使用学号最后两位、学生名、作业名生成文件名 new_filename = f'{student_last_two_digits}_{session.get("student_name")}_{assignment}{file_extension}' folder_name = f'sure_homework_define_by_qin/{assignment}' # 生成对象键前缀(不含扩展名),用于查找已存在的文件 object_key_prefix = f'{folder_name}/{student_id}_{session.get("student_name")}_{assignment}' # 删除已存在的文件(任何扩展名) delete_old_files_in_s3(object_key_prefix) # 再生成新的对象键(含有扩展名) object_key = f'{object_key_prefix}{file_extension}' logging.info(f"Generated object_key: {object_key}") # 生成预签名 URL url = generate_presigned_url(object_key, 'application/octet-stream') if not url: logging.error("Failed to generate presigned URL") return jsonify({'error': 'Failed to generate presigned URL'}), 500 # 更新数据库记录 add_or_update_submission(student_id, assignment, new_filename, code_verified=True) # 清除 session 中的验证码和作业信息 session.pop('submission_code', None) session.pop('submission_student_id', None) session.pop('submission_assignment', None) session['validated_assignment'] = assignment session['validation_presigned_url'] = url return jsonify({'status': '成功', 'upload_url': url}) else: return jsonify({'error': '验证码错误'}), 400 # 辅助函数:删除 S3 存储桶中具有指定前缀的旧文件(无论扩展名是何) def delete_old_files_in_s3(object_key_prefix): # 查找存储桶中是否已经存在具有相同前缀(不同后缀)的文件 response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=object_key_prefix) if 'Contents' in response: for obj in response['Contents']: s3_client.delete_object(Bucket=bucket_name, Key=obj['Key']) logging.info(f"Deleted old file from S3: {obj['Key']}") @app.route('/api/submissions') def get_submissions(): try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT sh.submit_date AS 时间, sh.filename AS 提交的文件, s.name AS 姓名, s.id AS 学号, sh.assignment_id AS 作业 FROM submission_history sh JOIN students s ON sh.student_id = s.id ORDER BY sh.submit_date DESC """) submissions = cursor.fetchall() cursor.close() conn.close() # 格式化日期时间 for sub in submissions: sub['时间'] = sub['时间'].strftime('%Y-%m-%d %H:%M:%S') if sub['时间'] else None return jsonify(submissions) except Exception as e: logging.error(f"Error in get_submissions: {str(e)}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/api/assignment-status') def get_assignment_status(): try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT s.id AS 学号, s.name AS 姓名, a.assignment_id AS 作业, CASE WHEN sub.id IS NOT NULL THEN '已提交' ELSE '未提交' END AS 提交情况, COALESCE(sub.filename, '') AS 提交的文件 FROM students s CROSS JOIN (SELECT DISTINCT assignment_id FROM submissions) a LEFT JOIN submissions sub ON s.id = sub.student_id AND a.assignment_id = sub.assignment_id ORDER BY s.id, a.assignment_id """) status_data = cursor.fetchall() cursor.close() conn.close() return jsonify(status_data) except Exception as e: logging.error(f"Error in get_assignment_status: {str(e)}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/download-submissions') def download_submissions(): try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT sh.submit_date AS 时间, sh.filename AS 提交的项目, s.name AS 姓名, s.id AS 学号, sh.assignment_id AS 作业 FROM submission_history sh JOIN students s ON sh.student_id = s.id ORDER BY sh.submit_date DESC """) submissions = cursor.fetchall() cursor.close() conn.close() # 创建 DataFrame df = pd.DataFrame(submissions) # 格式化日期时间 df['时间'] = pd.to_datetime(df['时间']).dt.strftime('%Y-%m-%d %H:%M:%S') # 确保列的顺序 df = df[['时间', '提交的项目', '姓名', '学号', '作业']] # 创建一个 BytesIO 对象,用于存储 Excel 文件 output = io.BytesIO() # 使用 ExcelWriter 来设置列宽 with pd.ExcelWriter(output, engine='xlsxwriter') as writer: df.to_excel(writer, sheet_name='提交记录', index=False) worksheet = writer.sheets['提交记录'] # 设置列宽 worksheet.set_column('A:A', 20) # 时间 worksheet.set_column('B:B', 30) # 提交的项目 worksheet.set_column('C:C', 15) # 姓名 worksheet.set_column('D:D', 15) # 学号 worksheet.set_column('E:E', 20) # 作业 output.seek(0) return send_file( output, as_attachment=True, download_name='submissions.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) except Exception as e: logging.error(f"Error in download_submissions: {str(e)}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/download-assignment-status') def download_assignment_status(): try: conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT s.id AS 学号, s.name AS 姓名, COALESCE(sub.assignment_id, 'No assignment') AS 作业, CASE WHEN sub.id IS NOT NULL THEN '已提交' ELSE '未提交' END AS 作业提交情况, COALESCE(sub.filename, '') AS 提交的项目, COALESCE(sub.submit_date, '') AS 提交时间 FROM students s LEFT JOIN submissions sub ON s.id = sub.student_id ORDER BY s.id, sub.assignment_id """) status_data = cursor.fetchall() cursor.close() conn.close() df = pd.DataFrame(status_data) # 格式化日期时间 df['提交时间'] = pd.to_datetime(df['提交时间']).dt.strftime('%Y-%m-%d %H:%M:%S') # 确保列的顺序 df = df[['学号', '姓名', '作业', '作业提交情况', '提交的项目', '提交时间']] output = io.BytesIO() with pd.ExcelWriter(output, engine='xlsxwriter') as writer: df.to_excel(writer, sheet_name='作业提交情况', index=False) worksheet = writer.sheets['作业提交情况'] # 设置列宽 worksheet.set_column('A:A', 15) # 学号 worksheet.set_column('B:B', 10) # 姓名 worksheet.set_column('C:C', 15) # 作业 worksheet.set_column('D:D', 12) # 作业提交情况 worksheet.set_column('E:E', 30) # 提交的项目 worksheet.set_column('F:F', 20) # 提交时间 output.seek(0) return send_file( output, as_attachment=True, download_name='assignment_status.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) except Exception as e: logging.error(f"Error in download_assignment_status: {str(e)}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/') def serve_index(): if 'student_id' not in session: return redirect(url_for('login')) conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM assignments ORDER BY deadline') assignments = cursor.fetchall() cursor.close() conn.close() return render_template('index.html', assignments=assignments) @app.route('/logout') def logout(): session.clear() return redirect(url_for('login')) def _build_cors_preflight_response(): response = make_response() response.headers.add("Access-Control-Allow-Origin", "*") response.headers.add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") response.headers.add("Access-Control-Allow-Headers", "Content-Type") return response # 教师管理路由 def add_teacher_routes(app, mail, bcrypt): # 教师登录装饰器 def teacher_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'teacher_id' not in session: return redirect(url_for('teacher_login')) return f(*args, **kwargs) return decorated_function # Teacher Login Route @app.route('/teacher/login', methods=['GET', 'POST']) def teacher_login(): if request.method == 'POST': email = request.form['email'] password = request.form['password'] teacher = validate_teacher(email, password, bcrypt) if teacher: session['teacher_id'] = teacher['id'] return redirect(url_for('teacher_panel')) else: flash('Invalid credentials', 'error') return render_template('teacher_login.html') @app.route('/teacher/logout') def teacher_logout(): session.pop('teacher_id', None) return redirect(url_for('teacher_login')) @app.route('/teacher/panel') @teacher_required def teacher_panel(): # Fetch the classes this teacher is responsible for teacher_id = session['teacher_id'] classes = fetch_teacher_classes(teacher_id) return render_template('teacher_panel.html', classes=classes) @app.route('/teacher/download-assignment/', methods=['GET']) def download_assignment(assignment_value): """生成 ZIP 并直接发送给用户""" teacher_id = session.get('teacher_id') if not teacher_id: return redirect(url_for('teacher_login')) # 构造 S3 文件夹路径 folder_key = f"sure_homework_define_by_qin/{assignment_value}/" # 下载并压缩作业文件夹 zip_file_path = download_and_zip_files(os.getenv('S3_BUCKET_NAME'), folder_key) if not zip_file_path or not os.path.exists(zip_file_path): logging.error(f"Failed to create ZIP file at {zip_file_path}") return jsonify({"error": "无法创建 zip 文件"}), 500 try: # 将来自 assignment_value 的文件名进行 URL 编码,以支持中文和特殊字符 filename = quote(f"{assignment_value}.zip") logging.info(f"Sending file: {zip_file_path} as {filename}") return send_file(zip_file_path, as_attachment=True, download_name=filename, mimetype='application/zip') except Exception as ex: logging.error(f"Error sending file: {str(ex)}") return jsonify({"error": "Error sending file", "details": str(ex)}), 500 @app.route('/teacher/class/') @teacher_required def view_class(class_id): # Fetch the assignments and students of the class assignments = fetch_class_assignments(class_id) students = fetch_class_students(class_id) return render_template('class_detail.html', assignments=assignments, students=students, class_id=class_id) @app.route('/teacher/class//add_assignment', methods=['POST']) @teacher_required def teacher_add_assignment(class_id): # Adding a new assignment to the class value = request.form['value'] name = request.form['name'] deadline = request.form['deadline'] conn = get_db_connection() cursor = conn.cursor() cursor.execute('INSERT INTO assignments (value, name, deadline, class_id) VALUES (%s, %s, %s, %s)', (value, name, deadline, class_id)) conn.commit() cursor.close() conn.close() flash('Assignment added successfully', 'success') return redirect(url_for('view_class', class_id=class_id)) @app.route('/teacher/edit_assignment/', methods=['POST']) def edit_assignment(assignment_id): new_deadline = request.json.get('deadline') if new_deadline: conn = get_db_connection() cursor = conn.cursor() cursor.execute('UPDATE assignments SET deadline = %s WHERE id = %s', (new_deadline, assignment_id)) conn.commit() cursor.close() conn.close() return jsonify({'status': 'success'}) return jsonify({'error': 'Invalid deadline'}), 400 @app.route('/teacher/delete_assignment/', methods=['DELETE']) def delete_assignment(assignment_id): conn = get_db_connection() cursor = conn.cursor() cursor.execute('DELETE FROM assignments WHERE id = %s', (assignment_id,)) conn.commit() cursor.close() conn.close() return jsonify({'status': 'success'})