import os import mysql.connector from flask import Flask, request, jsonify, session, redirect, url_for, render_template from flask_bcrypt import Bcrypt from some_module_to_verify_code import verify_code # 假设你有相应的模块 import datetime import random import logging from flask_mail import Mail, Message import boto3 from botocore.client import Config import csv # 初始化 Flask 和 Bcrypt app = Flask(__name__) bcrypt = Bcrypt(app) mail = Mail(app) # 配置日志 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # 配置 AWS S3 aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID') aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY') region_name = os.getenv('AWS_REGION') bucket_name = os.getenv('S3_BUCKET_NAME') # 初始化S3客户端 s3_client = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name, config=Config(signature_version='s3v4') ) # 初始化 CSV 文件 submissions_file = 'submissions.csv' if not os.path.exists(submissions_file): with open(submissions_file, 'w', newline='') as file: writer = csv.writer(file) writer.writerow(['ID', '学生姓名', '学号', '作业', '提交的文件']) # 数据库连接函数 def get_db_connection(): return mysql.connector.connect( host='8.218.165.242', user='sure_001', password='EKKWLMmrGmG7sdPf', database='sure_001' ) # 验证学生身份 def validate_student(student_id, password): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM students WHERE id = %s', (student_id,)) student = cursor.fetchone() cursor.close() conn.close() if student and bcrypt.check_password_hash(student['password'], password): return student return None # 检查作业提交 def check_submission(student_id, assignment_id): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM submissions WHERE student_id = %s AND assignment_id = %s', (student_id, assignment_id)) submission = cursor.fetchone() cursor.close() conn.close() return submission # 添加或更新作业提交 def add_or_update_submission(student_id, assignment_id, filename, code_verified=False): conn = get_db_connection() cursor = conn.cursor() if code_verified: cursor.execute( 'UPDATE submissions SET submit_date = NOW(), filename = %s WHERE student_id = %s AND assignment_id = %s', (filename, student_id, assignment_id)) else: cursor.execute( 'INSERT INTO submissions (student_id, assignment_id, filename, submit_date) VALUES (%s, %s, %s, NOW())', (student_id, assignment_id, filename)) conn.commit() cursor.close() conn.close() def fetch_all_departments(): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM departments') departments = cursor.fetchall() cursor.close() conn.close() return departments def fetch_all_grades(): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM grades') grades = cursor.fetchall() cursor.close() conn.close() return grades def fetch_all_classes(): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM classes') classes = cursor.fetchall() cursor.close() conn.close() return classes def fetch_all_teachers(): conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM teachers') teachers = cursor.fetchall() cursor.close() conn.close() return teachers # 生成预签名URL def generate_presigned_url(object_key, content_type): try: url = s3_client.generate_presigned_url( 'put_object', Params={'Bucket': bucket_name, 'Key': object_key, 'ContentType': content_type}, ExpiresIn=3600 ) return url except Exception as e: logging.error(f"Failed to generate presigned URL: {str(e)}") return None # 提交作业路由 @app.route('/submit-assignment', methods=['POST']) def submit_assignment(): student_id = request.form.get('student_id') assignment_id = request.form.get('assignment_id') filename = request.form.get('filename') submission = check_submission(student_id, assignment_id) if submission: # 要求用户输入验证码 email = request.form.get('email') code = request.form.get('code') if verify_code(email, code): add_or_update_submission(student_id, assignment_id, filename, code_verified=True) else: add_or_update_submission(student_id, assignment_id, filename, code_verified=False) return 'Submission successful' # 登录路由 @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': student_id = request.form.get('student_id') password = request.form.get('password') student = validate_student(student_id, password) if student: session['student_id'] = student['id'] session['student_name'] = student['name'] return redirect(url_for('serve_index')) else: return render_template('login.html', error='学号或密码错误') return render_template('login.html') # 重置密码路由 @app.route('/reset-password', methods=['GET', 'POST']) def reset_password(): if request.method == 'POST': student_id = request.form.get('student_id') email = request.form.get('email') conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT * FROM students WHERE id = %s AND email = %s', (student_id, email)) student = cursor.fetchone() cursor.close() conn.close() if student: reset_code = ''.join(random.choices('0123456789', k=6)) session['reset_code'] = reset_code session['reset_student_id'] = student_id try: msg = Message('重置密码验证码', recipients=[email]) msg.body = f'您用于重置密码的验证码是: {reset_code}' mail.send(msg) return render_template('reset_password.html', success='验证码已发送到您的邮箱,请检查并输入验证码') except Exception as e: logging.error(f"Error sending email: {str(e)}") return render_template('reset_password.html', error='发送验证码失败,请稍后再试') else: return render_template('reset_password.html', error='学号和邮箱不匹配') return render_template('reset_password.html') # 提交记录路由 @app.route('/record-submission', methods=['POST']) def record_submission(): data = request.json student_id = session.get('student_id') student_name = session.get('student_name') assignment = data.get('assignment') filename = data.get('filename') if not student_id or not filename or not assignment: return jsonify({'error': 'Student_id, assignment and filename parameters are required'}), 400 # Check if the student has already submitted this assignment submission = check_submission(student_id, assignment) if submission: session['filename'] = filename return jsonify({'error': '作业已提交过,需要验证码'}), 401 # 生成 pre-signed URL 并记录提交 new_filename = f'{student_id}_{student_name}_{assignment}' folder_name = f'sure_homework_define_by_qin/{assignment}' object_key = f'{folder_name}/{new_filename}' url = generate_presigned_url(object_key, 'application/octet-stream') if not url: logging.error("Failed to generate presigned URL") return jsonify({'error': 'Failed to generate presigned URL'}), 500 add_or_update_submission(student_id, assignment, filename) return jsonify({'status': 'success', 'upload_url': url}) # 生成验证码路由 @app.route('/generate-code', methods=['POST']) def generate_code(): student_id = session.get('student_id') assignment = request.json.get('assignment') conn = get_db_connection() cursor = conn.cursor(dictionary=True) cursor.execute('SELECT email FROM students WHERE id = %s', (student_id,)) student = cursor.fetchone() cursor.close() conn.close() if not student: return jsonify({'error': '学生信息未找到'}), 404 email = student['email'] reset_code = ''.join(random.choices('0123456789', k=6)) session['submission_code'] = reset_code session['submission_student_id'] = student_id session['submission_assignment'] = assignment try: msg = Message('提交验证码', recipients=[email]) msg.body = f'您用于提交作业的验证码是: {reset_code}' mail.send(msg) return jsonify({'status': '验证码已发送到您的邮箱'}) except Exception as e: logging.error(f"Error sending email: {str(e)}") return jsonify({'error': '发送验证码失败,请稍后再试'}), 500 if __name__ == '__main__': app.run(debug=True)