superlishunqin 68b99755ec ALL
2024-11-14 15:46:37 +08:00

281 lines
9.2 KiB
Python

import os
import mysql.connector
from flask import Flask, request, jsonify, session, redirect, url_for, render_template
from flask_bcrypt import Bcrypt
from some_module_to_verify_code import verify_code # 假设你有相应的模块
import datetime
import random
import logging
from flask_mail import Mail, Message
import boto3
from botocore.client import Config
import csv
# 初始化 Flask 和 Bcrypt
app = Flask(__name__)
bcrypt = Bcrypt(app)
mail = Mail(app)
# 配置日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# 配置 AWS S3
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
region_name = os.getenv('AWS_REGION')
bucket_name = os.getenv('S3_BUCKET_NAME')
# 初始化S3客户端
s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
config=Config(signature_version='s3v4')
)
# 初始化 CSV 文件
submissions_file = 'submissions.csv'
if not os.path.exists(submissions_file):
with open(submissions_file, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['ID', '学生姓名', '学号', '作业', '提交的文件'])
# 数据库连接函数
def get_db_connection():
return mysql.connector.connect(
host='8.218.165.242',
user='sure_001',
password='EKKWLMmrGmG7sdPf',
database='sure_001'
)
# 验证学生身份
def validate_student(student_id, password):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM students WHERE id = %s', (student_id,))
student = cursor.fetchone()
cursor.close()
conn.close()
if student and bcrypt.check_password_hash(student['password'], password):
return student
return None
# 检查作业提交
def check_submission(student_id, assignment_id):
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM submissions WHERE student_id = %s AND assignment_id = %s',
(student_id, assignment_id))
submission = cursor.fetchone()
cursor.close()
conn.close()
return submission
# 添加或更新作业提交
def add_or_update_submission(student_id, assignment_id, filename, code_verified=False):
conn = get_db_connection()
cursor = conn.cursor()
if code_verified:
cursor.execute(
'UPDATE submissions SET submit_date = NOW(), filename = %s WHERE student_id = %s AND assignment_id = %s',
(filename, student_id, assignment_id))
else:
cursor.execute(
'INSERT INTO submissions (student_id, assignment_id, filename, submit_date) VALUES (%s, %s, %s, NOW())',
(student_id, assignment_id, filename))
conn.commit()
cursor.close()
conn.close()
def fetch_all_departments():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM departments')
departments = cursor.fetchall()
cursor.close()
conn.close()
return departments
def fetch_all_grades():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM grades')
grades = cursor.fetchall()
cursor.close()
conn.close()
return grades
def fetch_all_classes():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM classes')
classes = cursor.fetchall()
cursor.close()
conn.close()
return classes
def fetch_all_teachers():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM teachers')
teachers = cursor.fetchall()
cursor.close()
conn.close()
return teachers
# 生成预签名URL
def generate_presigned_url(object_key, content_type):
try:
url = s3_client.generate_presigned_url(
'put_object',
Params={'Bucket': bucket_name, 'Key': object_key, 'ContentType': content_type},
ExpiresIn=3600
)
return url
except Exception as e:
logging.error(f"Failed to generate presigned URL: {str(e)}")
return None
# 提交作业路由
@app.route('/submit-assignment', methods=['POST'])
def submit_assignment():
student_id = request.form.get('student_id')
assignment_id = request.form.get('assignment_id')
filename = request.form.get('filename')
submission = check_submission(student_id, assignment_id)
if submission:
# 要求用户输入验证码
email = request.form.get('email')
code = request.form.get('code')
if verify_code(email, code):
add_or_update_submission(student_id, assignment_id, filename, code_verified=True)
else:
add_or_update_submission(student_id, assignment_id, filename, code_verified=False)
return 'Submission successful'
# 登录路由
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
student_id = request.form.get('student_id')
password = request.form.get('password')
student = validate_student(student_id, password)
if student:
session['student_id'] = student['id']
session['student_name'] = student['name']
return redirect(url_for('serve_index'))
else:
return render_template('login.html', error='学号或密码错误')
return render_template('login.html')
# 重置密码路由
@app.route('/reset-password', methods=['GET', 'POST'])
def reset_password():
if request.method == 'POST':
student_id = request.form.get('student_id')
email = request.form.get('email')
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT * FROM students WHERE id = %s AND email = %s', (student_id, email))
student = cursor.fetchone()
cursor.close()
conn.close()
if student:
reset_code = ''.join(random.choices('0123456789', k=6))
session['reset_code'] = reset_code
session['reset_student_id'] = student_id
try:
msg = Message('重置密码验证码', recipients=[email])
msg.body = f'您用于重置密码的验证码是: {reset_code}'
mail.send(msg)
return render_template('reset_password.html', success='验证码已发送到您的邮箱,请检查并输入验证码')
except Exception as e:
logging.error(f"Error sending email: {str(e)}")
return render_template('reset_password.html', error='发送验证码失败,请稍后再试')
else:
return render_template('reset_password.html', error='学号和邮箱不匹配')
return render_template('reset_password.html')
# 提交记录路由
@app.route('/record-submission', methods=['POST'])
def record_submission():
data = request.json
student_id = session.get('student_id')
student_name = session.get('student_name')
assignment = data.get('assignment')
filename = data.get('filename')
if not student_id or not filename or not assignment:
return jsonify({'error': 'Student_id, assignment and filename parameters are required'}), 400
# Check if the student has already submitted this assignment
submission = check_submission(student_id, assignment)
if submission:
session['filename'] = filename
return jsonify({'error': '作业已提交过,需要验证码'}), 401
# 生成 pre-signed URL 并记录提交
new_filename = f'{student_id}_{student_name}_{assignment}'
folder_name = f'sure_homework_define_by_qin/{assignment}'
object_key = f'{folder_name}/{new_filename}'
url = generate_presigned_url(object_key, 'application/octet-stream')
if not url:
logging.error("Failed to generate presigned URL")
return jsonify({'error': 'Failed to generate presigned URL'}), 500
add_or_update_submission(student_id, assignment, filename)
return jsonify({'status': 'success', 'upload_url': url})
# 生成验证码路由
@app.route('/generate-code', methods=['POST'])
def generate_code():
student_id = session.get('student_id')
assignment = request.json.get('assignment')
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute('SELECT email FROM students WHERE id = %s', (student_id,))
student = cursor.fetchone()
cursor.close()
conn.close()
if not student:
return jsonify({'error': '学生信息未找到'}), 404
email = student['email']
reset_code = ''.join(random.choices('0123456789', k=6))
session['submission_code'] = reset_code
session['submission_student_id'] = student_id
session['submission_assignment'] = assignment
try:
msg = Message('提交验证码', recipients=[email])
msg.body = f'您用于提交作业的验证码是: {reset_code}'
mail.send(msg)
return jsonify({'status': '验证码已发送到您的邮箱'})
except Exception as e:
logging.error(f"Error sending email: {str(e)}")
return jsonify({'error': '发送验证码失败,请稍后再试'}), 500
if __name__ == '__main__':
app.run(debug=True)