diff --git a/app_function.py b/app_function.py new file mode 100644 index 0000000..8237055 --- /dev/null +++ b/app_function.py @@ -0,0 +1,924 @@ +from flask import Flask, request, jsonify, redirect, url_for, render_template, session, make_response, flash +from flask_mail import Mail, Message +from flask_bcrypt import Bcrypt +import mysql.connector +import boto3 +from botocore.client import Config +import os +import logging +import random +import pandas as pd +from dotenv import load_dotenv +from flask import send_file +import io +from functools import wraps +import tempfile +import shutil +import zipfile +import time +from urllib.parse import quote + +load_dotenv() +bcrypt = Bcrypt() + +# 配置 AWS S3 +aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID') +aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY') +region_name = os.getenv('AWS_REGION') +bucket_name = os.getenv('S3_BUCKET_NAME') +s3_client = boto3.client( + 's3', + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region_name, + config=Config(signature_version='s3v4') +) + +# 下载相关 +def download_and_zip_files(bucket_name, folder_key): + """下载文件并进行支持 ZIP64 的压缩打包""" + temp_dir = tempfile.mkdtemp() # 创建一个临时目录 + zip_file_path = os.path.join(temp_dir, 'homework.zip') + + try: + # 列出文件 + file_keys = list_files_in_folder(bucket_name, folder_key) + + if not file_keys: + logging.error(f"No files found in folder: {folder_key}") + return None + + # 下载文件 + for file_key in file_keys: + file_name = os.path.basename(file_key) + file_download_path = os.path.join(temp_dir, file_name) + + # 下载文件 + logging.info(f"Downloading {file_key} to {file_download_path}") + s3_client.download_file(bucket_name, file_key, file_download_path) + + # ZIP 压缩过程 + with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zipf: + for root, _, files in os.walk(temp_dir): + for file in files: + file_path = os.path.join(root, file) + + # 排除自身的 ZIP 文件,防止自我压缩 + if file == os.path.basename(zip_file_path): + logging.info(f"Skipping self zip file: {file}") + continue + + # 打包其它文件 + arcname = os.path.relpath(file_path, start=temp_dir) + logging.info(f"Adding {file} to archive as {arcname}") + zipf.write(file_path, arcname=arcname) + + logging.info(f"Zip file created at {zip_file_path}") + return zip_file_path # 返回 ZIP 文件路径 + + except Exception as e: + logging.error(f"Error during download or zip operation: {e}") + return None + + +def upload_zip_to_s3(zip_file_path, bucket_name, object_key): + """上传压缩包文件到S3""" + try: + s3_client.upload_file(zip_file_path, bucket_name, object_key) + except ClientError as e: + logging.error(f"Error uploading zip to S3: {e}") + return None + + return generate_presigned_download_url(bucket_name, object_key) + +# 列出文件 +def list_files_in_folder(bucket_name, folder_key): + try: + response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_key) + if 'Contents' not in response: + logging.error(f"No contents found in S3 folder {folder_key}") + return [] + + files = [obj['Key'] for obj in response['Contents'] if obj['Key'] != folder_key] + logging.info(f"Files in folder {folder_key}: {files}") + return files + + except ClientError as e: + logging.error(f"Error listing files in folder {folder_key}: {e}") + return [] + +# 数据库连接函数 +def get_db_connection(): + return mysql.connector.connect( + host=os.getenv('MYSQL_HOST'), + user=os.getenv('MYSQL_USER'), + password=os.getenv('MYSQL_PASSWORD'), + database=os.getenv('MYSQL_DB') + ) + +# 验证学生身份 +def validate_student(student_id, password, bcrypt): + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM students WHERE id = %s', (student_id,)) + student = cursor.fetchone() + cursor.close() + conn.close() + if student and bcrypt.check_password_hash(student['password'], password): + return student + return None + +# 验证管理员身份 +def validate_admin(username, password): + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM administrators WHERE username = %s', (username,)) + admin = cursor.fetchone() + cursor.close() + conn.close() + + if admin and bcrypt.check_password_hash(admin['password'], password): + return admin + return None + +# 验证教师身份 +def validate_teacher(email, password, bcrypt): + conn = get_db_connection() # 或者其他方法来获取数据库连接 + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM teachers WHERE email = %s', (email,)) + teacher = cursor.fetchone() + cursor.close() + conn.close() + + if teacher and bcrypt.check_password_hash(teacher['password'], password): + return teacher + return None + +def fetch_all_departments(): + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM departments') + departments = cursor.fetchall() + cursor.close() + conn.close() + return departments + +def fetch_all_grades(): + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM grades') + grades = cursor.fetchall() + cursor.close() + conn.close() + return grades + +def fetch_all_classes(): + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM classes') + classes = cursor.fetchall() + cursor.close() + conn.close() + return classes + +def fetch_all_teachers(): + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM teachers') + teachers = cursor.fetchall() + cursor.close() + conn.close() + return teachers + +def fetch_teacher_classes(teacher_id): + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + + # 假设每个教师和班级的关联存储在一个中间表 `class_teacher` 中 + cursor.execute(''' + SELECT c.id, c.name FROM classes c + JOIN class_teacher ct ON c.id = ct.class_id + WHERE ct.teacher_id = %s + ''', (teacher_id,)) + + classes = cursor.fetchall() + + cursor.close() + conn.close() + return classes + +def fetch_class_assignments(class_id): + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + pivot_query = ''' + SELECT * FROM assignments + WHERE class_id = %s + ''' + cursor.execute(pivot_query, (class_id,)) + assignments = cursor.fetchall() + cursor.close() + conn.close() + return assignments + + +def fetch_class_students(class_id): + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + + # 查询属于给定class_id的所有学生 + cursor.execute(''' + SELECT * FROM students + WHERE class_id = %s + ''', (class_id,)) + + students = cursor.fetchall() + + cursor.close() + conn.close() + return students + + +# 添加到提交历史表 +def add_to_submission_history(student_id, assignment_id, filename): + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute( + 'INSERT INTO submission_history (student_id, assignment_id, filename, submit_date) VALUES (%s, %s, %s, NOW())', + (student_id, assignment_id, filename) + ) + conn.commit() + cursor.close() + conn.close() + +# 添加或更新作业提交记录 +def add_or_update_submission(student_id, assignment_id, filename, code_verified=False): + add_to_submission_history(student_id, assignment_id, filename) # 保留历史记录 + + conn = get_db_connection() + cursor = conn.cursor() + if code_verified: + cursor.execute( + 'UPDATE submissions SET submit_date = NOW(), filename = %s WHERE student_id = %s AND assignment_id = %s', + (filename, student_id, assignment_id)) + else: + cursor.execute( + 'INSERT INTO submissions (student_id, assignment_id, filename, submit_date) VALUES (%s, %s, %s, NOW())', + (student_id, assignment_id, filename)) + conn.commit() + cursor.close() + conn.close() + +# 生成预签名URL +def generate_presigned_url(object_key, content_type, expiration=3600): + try: + url = s3_client.generate_presigned_url( + 'put_object', + Params={'Bucket': bucket_name, 'Key': object_key, 'ContentType': content_type}, + ExpiresIn=expiration + ) + return url + except Exception as e: + logging.error(f"Failed to generate presigned URL: {str(e)}") + return None + +# 添加管理员路由 +def add_admin_routes(app, mail, bcrypt): + # 管理员登录装饰器 + def admin_required(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if 'admin_id' not in session: + return redirect(url_for('admin_login')) + return f(*args, **kwargs) + return decorated_function + + @app.route('/admin/add_department', methods=['POST']) + @admin_required + def add_department(): + name = request.form['name'] + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('INSERT INTO departments (name) VALUES (%s)', (name,)) + conn.commit() + cursor.close() + conn.close() + flash('Department added successfully', 'success') + return redirect(url_for('admin_panel')) + + @app.route('/admin/add_grade', methods=['POST']) + @admin_required + def add_grade(): + year = request.form['year'] + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('INSERT INTO grades (year) VALUES (%s)', (year,)) + conn.commit() + cursor.close() + conn.close() + flash('Grade added successfully', 'success') + return redirect(url_for('admin_panel')) + + @app.route('/admin/add_class', methods=['POST']) + @admin_required + def add_class(): + name = request.form['name'] + department_id = request.form['department_id'] + grade_id = request.form['grade_id'] + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('INSERT INTO classes (name, department_id, grade_id) VALUES (%s, %s, %s)', + (name, department_id, grade_id)) + conn.commit() + cursor.close() + conn.close() + flash('Class added successfully', 'success') + return redirect(url_for('admin_panel')) + + @app.route('/admin/add_teacher', methods=['POST']) + @admin_required + def add_teacher(): + name = request.form['name'] + email = request.form['email'] + password = bcrypt.generate_password_hash(request.form['password']).decode('utf-8') + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('INSERT INTO teachers (name, email, password) VALUES (%s, %s, %s)', + (name, email, password)) + conn.commit() + cursor.close() + conn.close() + flash('Teacher added successfully', 'success') + return redirect(url_for('admin_panel')) + + @app.route('/admin/add_administrator', methods=['POST']) + @admin_required + def add_administrator(): + username = request.form['username'] + password = bcrypt.generate_password_hash(request.form['password']).decode('utf-8') + teacher_id = request.form['teacher_id'] + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('INSERT INTO administrators (username, password, teacher_id) VALUES (%s, %s, %s)', + (username, password, teacher_id)) + conn.commit() + cursor.close() + conn.close() + flash('Administrator added successfully', 'success') + return redirect(url_for('admin_panel')) + + @app.route('/admin/assign_teacher', methods=['POST']) + @admin_required + def assign_teacher(): + class_id = request.form['class_id'] + teacher_id = request.form['teacher_id'] + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('INSERT INTO class_teacher (class_id, teacher_id) VALUES (%s, %s)', (class_id, teacher_id)) + conn.commit() + cursor.close() + conn.close() + flash('Teacher assigned to class successfully', 'success') + return redirect(url_for('admin_panel')) + + @app.route('/admin/login', methods=['GET', 'POST']) + def admin_login(): + if request.method == 'POST': + username = request.form['username'] + password = request.form['password'] + admin = validate_admin(username, password) + if admin: + session['admin_id'] = admin['id'] + return redirect(url_for('admin_panel')) + else: + flash('Invalid credentials', 'error') + return render_template('admin_login.html') + + @app.route('/admin/logout') + def admin_logout(): + session.pop('admin_id', None) + return redirect(url_for('admin_login')) + + @app.route('/admin/panel') + @admin_required + def admin_panel(): + # Retrieve necessary data (e.g., departments, grades, classes, etc.) + departments = fetch_all_departments() + grades = fetch_all_grades() + classes = fetch_all_classes() + teachers = fetch_all_teachers() + return render_template('admin_panel.html', departments=departments, grades=grades, classes=classes, teachers=teachers) + + @app.route('/admin/add_assignment', methods=['POST']) + @admin_required + def admin_add_assignment(): + value = request.form['value'] + name = request.form['name'] + deadline = request.form['deadline'] + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('INSERT INTO assignments (value, name, deadline) VALUES (%s, %s, %s)', (value, name, deadline)) + conn.commit() + cursor.close() + conn.close() + flash('Assignment added successfully', 'success') + return redirect(url_for('admin_panel')) + + @app.route('/login', methods=['GET', 'POST']) + def login(): + if request.method == 'POST': + student_id = request.form.get('student_id') + password = request.form.get('password') + student = validate_student(student_id, password, bcrypt) + if student: + session['student_id'] = student['id'] + session['student_name'] = student['name'] + return redirect(url_for('serve_index')) + else: + return render_template('login.html', error='学号或密码错误') + return render_template('login.html') + + @app.route('/reset-password', methods=['GET', 'POST']) + def reset_password(): + if request.method == 'POST': + student_id = request.form.get('student_id') + email = request.form.get('email') + code = request.form.get('code') + new_password = request.form.get('new_password') + if code: + if session.get('reset_code') == code and session.get('reset_student_id') == student_id: + conn = get_db_connection() + cursor = conn.cursor() + hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8') + cursor.execute('UPDATE students SET password = %s WHERE id = %s', (hashed_password, student_id)) + conn.commit() + cursor.close() + conn.close() + return render_template('login.html', success='密码已成功重置,请使用新密码登录') + else: + return render_template('reset_password.html', error='验证码错误') + else: + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM students WHERE id = %s AND email = %s', (student_id, email)) + student = cursor.fetchone() + cursor.close() + conn.close() + if student: + reset_code = ''.join(random.choices('0123456789', k=6)) + session['reset_code'] = reset_code + session['reset_student_id'] = student_id + try: + msg = Message('重置密码验证码', recipients=[email]) + msg.body = f'您用于重置密码的验证码是: {reset_code}' + mail.send(msg) + return render_template('reset_password.html', + success='验证码已发送到您的邮箱,请检查并输入验证码') + except Exception as e: + logging.error(f"Error sending email: {str(e)}") + return render_template('reset_password.html', error='发送验证码失败,请稍后再试') + else: + return render_template('reset_password.html', error='学号和邮箱不匹配') + return render_template('reset_password.html') + + @app.route('/record-submission', methods=['POST']) + def record_submission(): + data = request.json + student_id = session.get('student_id') + student_name = session.get('student_name') + assignment = data.get('assignment') + filename = data.get('filename') # 通过前端传递的文件名获取 + + if not student_id or not filename or not assignment: + return jsonify({'error': '学号、作业和文件名是必要参数'}), 400 + + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM submissions WHERE student_id = %s AND assignment_id = %s', + (student_id, assignment)) + submission = cursor.fetchone() # 检查该学生是否已提交过此作业 + + cursor.close() + conn.close() + + if submission: + session['filename'] = filename # 保留新文件名以备`validate-code`使用 + return jsonify({'error': '作业已提交过,请输入验证码继续'}), 401 # 提示用户输入验证码以覆盖提交 + + # 提取当前上传文件的扩展名 + _, file_extension = os.path.splitext(filename) # 通过前端传递的文件提取扩展名 + + # 生成新的文件名,并确保文件扩展名保持正确 + new_filename = f'{student_id}_{student_name}_{assignment}{file_extension}' + folder_name = f'sure_homework_define_by_qin/{assignment}' + object_key = f'{folder_name}/{new_filename}' + + # 生成预签名 URL + url = generate_presigned_url(object_key, 'application/octet-stream') + + if not url: + logging.error("Failed to generate presigned URL") + return jsonify({'error': '无法生成预签名 URL'}), 500 + + # 保存提交记录 + add_or_update_submission(student_id, assignment, new_filename, code_verified=False) + + return jsonify({'status': 'success', 'upload_url': url}) + + @app.route('/generate-code', methods=['POST']) + def generate_code(): + student_id = session.get('student_id') + assignment = request.json.get('assignment') + + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT email FROM students WHERE id = %s', (student_id,)) + student = cursor.fetchone() + cursor.close() + conn.close() + + if not student: + return jsonify({'error': '学生信息未找到'}), 404 + + email = student['email'] + reset_code = ''.join(random.choices('0123456789', k=6)) + session['submission_code'] = reset_code + session['submission_student_id'] = student_id + session['submission_assignment'] = assignment + + try: + msg = Message('提交作业验证码', recipients=[email]) + msg.body = f'您用于提交作业的验证码是: {reset_code}' + mail.send(msg) + return jsonify({'status': '验证码已发送到您的邮箱,请检查并输入验证码'}) + except Exception as e: + logging.error(f"Error sending email: {str(e)}") + return jsonify({'error': '发送验证码失败,请稍后再试'}), 500 + + @app.route('/validate-code', methods=['POST']) + def validate_code(): + request_data = request.json + student_id = session.get('student_id') + assignment = request_data.get('assignment') + code = request_data.get('code') + + # 验证提交的验证码与 session 中的值是否匹配 + if (code == session.get('submission_code') and + student_id == session.get('submission_student_id') and + assignment == session.get('submission_assignment')): + + # 从 session 中获取新文件名 + filename = session.pop('filename', None) + + if not filename: + return jsonify({'error': 'No file was found in session'}), 400 + + # 提取文件扩展名 + _, file_extension = os.path.splitext(filename) # 获取文件的后缀名 + + # 使用学号、学生名、作业名生成文件名(不含后缀) + new_filename = f'{student_id}_{session.get("student_name")}_{assignment}{file_extension}' + folder_name = f'sure_homework_define_by_qin/{assignment}' + + # 生成对象键前缀(不含扩展名),用于查找已存在的文件 + object_key_prefix = f'{folder_name}/{student_id}_{session.get("student_name")}_{assignment}' + + # 删除已存在的文件(任何扩展名) + delete_old_files_in_s3(object_key_prefix) + + # 再生成新的对象键(含有扩展名) + object_key = f'{object_key_prefix}{file_extension}' + + logging.info(f"Generated object_key: {object_key}") + + # 生成预签名 URL + url = generate_presigned_url(object_key, 'application/octet-stream') + + if not url: + logging.error("Failed to generate presigned URL") + return jsonify({'error': 'Failed to generate presigned URL'}), 500 + + # 更新数据库记录 + add_or_update_submission(student_id, assignment, new_filename, code_verified=True) + + # 清除 session 中的验证码和作业信息 + session.pop('submission_code', None) + session.pop('submission_student_id', None) + session.pop('submission_assignment', None) + + session['validated_assignment'] = assignment + session['validation_presigned_url'] = url + + return jsonify({'status': '成功', 'upload_url': url}) + else: + return jsonify({'error': '验证码错误'}), 400 + + # 辅助函数:删除 S3 存储桶中具有指定前缀的旧文件(无论扩展名是何) + def delete_old_files_in_s3(object_key_prefix): + # 查找存储桶中是否已经存在具有相同前缀(不同后缀)的文件 + response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=object_key_prefix) + + if 'Contents' in response: + for obj in response['Contents']: + s3_client.delete_object(Bucket=bucket_name, Key=obj['Key']) + logging.info(f"Deleted old file from S3: {obj['Key']}") + + @app.route('/api/submissions') + def get_submissions(): + try: + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute(""" + SELECT + sh.submit_date AS 时间, + sh.filename AS 提交的文件, + s.name AS 姓名, + s.id AS 学号, + sh.assignment_id AS 作业 + FROM submission_history sh + JOIN students s ON sh.student_id = s.id + ORDER BY sh.submit_date DESC + """) + submissions = cursor.fetchall() + cursor.close() + conn.close() + + # 格式化日期时间 + for sub in submissions: + sub['时间'] = sub['时间'].strftime('%Y-%m-%d %H:%M:%S') if sub['时间'] else None + + return jsonify(submissions) + except Exception as e: + logging.error(f"Error in get_submissions: {str(e)}", exc_info=True) + return jsonify({'error': str(e)}), 500 + + @app.route('/api/assignment-status') + def get_assignment_status(): + try: + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute(""" + SELECT + s.id AS 学号, + s.name AS 姓名, + a.assignment_id AS 作业, + CASE WHEN sub.id IS NOT NULL THEN '已提交' ELSE '未提交' END AS 提交情况, + COALESCE(sub.filename, '') AS 提交的文件 + FROM students s + CROSS JOIN (SELECT DISTINCT assignment_id FROM submissions) a + LEFT JOIN submissions sub ON s.id = sub.student_id AND a.assignment_id = sub.assignment_id + ORDER BY s.id, a.assignment_id + """) + status_data = cursor.fetchall() + cursor.close() + conn.close() + + return jsonify(status_data) + except Exception as e: + logging.error(f"Error in get_assignment_status: {str(e)}", exc_info=True) + return jsonify({'error': str(e)}), 500 + + @app.route('/download-submissions') + def download_submissions(): + try: + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute(""" + SELECT + sh.submit_date AS 时间, + sh.filename AS 提交的项目, + s.name AS 姓名, + s.id AS 学号, + sh.assignment_id AS 作业 + FROM submission_history sh + JOIN students s ON sh.student_id = s.id + ORDER BY sh.submit_date DESC + """) + submissions = cursor.fetchall() + cursor.close() + conn.close() + + # 创建 DataFrame + df = pd.DataFrame(submissions) + + # 格式化日期时间 + df['时间'] = pd.to_datetime(df['时间']).dt.strftime('%Y-%m-%d %H:%M:%S') + + # 确保列的顺序 + df = df[['时间', '提交的项目', '姓名', '学号', '作业']] + + # 创建一个 BytesIO 对象,用于存储 Excel 文件 + output = io.BytesIO() + + # 使用 ExcelWriter 来设置列宽 + with pd.ExcelWriter(output, engine='xlsxwriter') as writer: + df.to_excel(writer, sheet_name='提交记录', index=False) + worksheet = writer.sheets['提交记录'] + + # 设置列宽 + worksheet.set_column('A:A', 20) # 时间 + worksheet.set_column('B:B', 30) # 提交的项目 + worksheet.set_column('C:C', 15) # 姓名 + worksheet.set_column('D:D', 15) # 学号 + worksheet.set_column('E:E', 20) # 作业 + + output.seek(0) + + return send_file( + output, + as_attachment=True, + download_name='submissions.xlsx', + mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + ) + except Exception as e: + logging.error(f"Error in download_submissions: {str(e)}", exc_info=True) + return jsonify({'error': str(e)}), 500 + + @app.route('/download-assignment-status') + def download_assignment_status(): + try: + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute(""" + SELECT + s.id AS 学号, + s.name AS 姓名, + COALESCE(sub.assignment_id, 'No assignment') AS 作业, + CASE WHEN sub.id IS NOT NULL THEN '已提交' ELSE '未提交' END AS 作业提交情况, + COALESCE(sub.filename, '') AS 提交的项目, + COALESCE(sub.submit_date, '') AS 提交时间 + FROM students s + LEFT JOIN submissions sub ON s.id = sub.student_id + ORDER BY s.id, sub.assignment_id + """) + status_data = cursor.fetchall() + cursor.close() + conn.close() + + df = pd.DataFrame(status_data) + + # 格式化日期时间 + df['提交时间'] = pd.to_datetime(df['提交时间']).dt.strftime('%Y-%m-%d %H:%M:%S') + + # 确保列的顺序 + df = df[['学号', '姓名', '作业', '作业提交情况', '提交的项目', '提交时间']] + + output = io.BytesIO() + with pd.ExcelWriter(output, engine='xlsxwriter') as writer: + df.to_excel(writer, sheet_name='作业提交情况', index=False) + worksheet = writer.sheets['作业提交情况'] + + # 设置列宽 + worksheet.set_column('A:A', 15) # 学号 + worksheet.set_column('B:B', 10) # 姓名 + worksheet.set_column('C:C', 15) # 作业 + worksheet.set_column('D:D', 12) # 作业提交情况 + worksheet.set_column('E:E', 30) # 提交的项目 + worksheet.set_column('F:F', 20) # 提交时间 + + output.seek(0) + + return send_file( + output, + as_attachment=True, + download_name='assignment_status.xlsx', + mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + ) + except Exception as e: + logging.error(f"Error in download_assignment_status: {str(e)}", exc_info=True) + return jsonify({'error': str(e)}), 500 + + @app.route('/') + def serve_index(): + if 'student_id' not in session: + return redirect(url_for('login')) + conn = get_db_connection() + cursor = conn.cursor(dictionary=True) + cursor.execute('SELECT * FROM assignments ORDER BY deadline') + assignments = cursor.fetchall() + cursor.close() + conn.close() + return render_template('index.html', assignments=assignments) + + @app.route('/logout') + def logout(): + session.clear() + return redirect(url_for('login')) + + def _build_cors_preflight_response(): + response = make_response() + response.headers.add("Access-Control-Allow-Origin", "*") + response.headers.add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + response.headers.add("Access-Control-Allow-Headers", "Content-Type") + return response + +# 教师管理路由 +def add_teacher_routes(app, mail, bcrypt): + # 教师登录装饰器 + def teacher_required(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if 'teacher_id' not in session: + return redirect(url_for('teacher_login')) + return f(*args, **kwargs) + return decorated_function + + # Teacher Login Route + @app.route('/teacher/login', methods=['GET', 'POST']) + def teacher_login(): + if request.method == 'POST': + email = request.form['email'] + password = request.form['password'] + teacher = validate_teacher(email, password, bcrypt) + if teacher: + session['teacher_id'] = teacher['id'] + return redirect(url_for('teacher_panel')) + else: + flash('Invalid credentials', 'error') + return render_template('teacher_login.html') + + @app.route('/teacher/logout') + def teacher_logout(): + session.pop('teacher_id', None) + return redirect(url_for('teacher_login')) + + @app.route('/teacher/panel') + @teacher_required + def teacher_panel(): + # Fetch the classes this teacher is responsible for + teacher_id = session['teacher_id'] + classes = fetch_teacher_classes(teacher_id) + return render_template('teacher_panel.html', classes=classes) + + @app.route('/teacher/download-assignment/', methods=['GET']) + def download_assignment(assignment_value): + """生成 ZIP 并直接发送给用户""" + teacher_id = session.get('teacher_id') + if not teacher_id: + return redirect(url_for('teacher_login')) + + # 构造 S3 文件夹路径 + folder_key = f"sure_homework_define_by_qin/{assignment_value}/" + + # 下载并压缩作业文件夹 + zip_file_path = download_and_zip_files(os.getenv('S3_BUCKET_NAME'), folder_key) + + if not zip_file_path or not os.path.exists(zip_file_path): + logging.error(f"Failed to create ZIP file at {zip_file_path}") + return jsonify({"error": "无法创建 zip 文件"}), 500 + + try: + # 将来自 assignment_value 的文件名进行 URL 编码,以支持中文和特殊字符 + filename = quote(f"{assignment_value}.zip") + + logging.info(f"Sending file: {zip_file_path} as {filename}") + return send_file(zip_file_path, as_attachment=True, download_name=filename, mimetype='application/zip') + except Exception as ex: + logging.error(f"Error sending file: {str(ex)}") + return jsonify({"error": "Error sending file", "details": str(ex)}), 500 + + @app.route('/teacher/class/') + @teacher_required + def view_class(class_id): + # Fetch the assignments and students of the class + assignments = fetch_class_assignments(class_id) + students = fetch_class_students(class_id) + return render_template('class_detail.html', assignments=assignments, students=students, class_id=class_id) + + @app.route('/teacher/class//add_assignment', methods=['POST']) + @teacher_required + def teacher_add_assignment(class_id): + # Adding a new assignment to the class + value = request.form['value'] + name = request.form['name'] + deadline = request.form['deadline'] + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('INSERT INTO assignments (value, name, deadline, class_id) VALUES (%s, %s, %s, %s)', (value, name, deadline, class_id)) + conn.commit() + cursor.close() + conn.close() + flash('Assignment added successfully', 'success') + return redirect(url_for('view_class', class_id=class_id)) + + @app.route('/teacher/edit_assignment/', methods=['POST']) + def edit_assignment(assignment_id): + new_deadline = request.json.get('deadline') + if new_deadline: + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('UPDATE assignments SET deadline = %s WHERE id = %s', (new_deadline, assignment_id)) + conn.commit() + cursor.close() + conn.close() + return jsonify({'status': 'success'}) + return jsonify({'error': 'Invalid deadline'}), 400 + + @app.route('/teacher/delete_assignment/', methods=['DELETE']) + def delete_assignment(assignment_id): + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute('DELETE FROM assignments WHERE id = %s', (assignment_id,)) + conn.commit() + cursor.close() + conn.close() + return jsonify({'status': 'success'}) \ No newline at end of file