222 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			222 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# app/services/user_service.py
 | 
						|
 | 
						|
from app.models.user import User, Role, db
 | 
						|
from sqlalchemy import or_
 | 
						|
from datetime import datetime
 | 
						|
from sqlalchemy import text
 | 
						|
 | 
						|
class UserService:
 | 
						|
    @staticmethod
 | 
						|
    def get_users(page=1, per_page=10, search_query=None, status=None, role_id=None):
 | 
						|
        """
 | 
						|
        获取用户列表,支持分页、搜索和过滤
 | 
						|
        """
 | 
						|
        query = User.query
 | 
						|
 | 
						|
        # 搜索条件
 | 
						|
        if search_query:
 | 
						|
            query = query.filter(or_(
 | 
						|
                User.username.like(f'%{search_query}%'),
 | 
						|
                User.email.like(f'%{search_query}%'),
 | 
						|
                User.nickname.like(f'%{search_query}%'),
 | 
						|
                User.phone.like(f'%{search_query}%')
 | 
						|
            ))
 | 
						|
 | 
						|
        # 状态过滤
 | 
						|
        if status is not None:
 | 
						|
            query = query.filter(User.status == status)
 | 
						|
 | 
						|
        # 角色过滤
 | 
						|
        if role_id is not None:
 | 
						|
            query = query.filter(User.role_id == role_id)
 | 
						|
 | 
						|
        # 分页
 | 
						|
        pagination = query.order_by(User.id.desc()).paginate(
 | 
						|
            page=page, per_page=per_page, error_out=False
 | 
						|
        )
 | 
						|
 | 
						|
        return pagination
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_user_by_id(user_id):
 | 
						|
        """通过ID获取用户"""
 | 
						|
        return User.query.get(user_id)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def update_user(user_id, data):
 | 
						|
        """更新用户信息"""
 | 
						|
        user = User.query.get(user_id)
 | 
						|
        if not user:
 | 
						|
            return False, "用户不存在"
 | 
						|
 | 
						|
        try:
 | 
						|
            # 更新可编辑字段
 | 
						|
            if 'email' in data:
 | 
						|
                # 检查邮箱是否已被其他用户使用
 | 
						|
                existing = User.query.filter(User.email == data['email'], User.id != user_id).first()
 | 
						|
                if existing:
 | 
						|
                    return False, "邮箱已被使用"
 | 
						|
                user.email = data['email']
 | 
						|
 | 
						|
            if 'phone' in data:
 | 
						|
                # 检查手机号是否已被其他用户使用
 | 
						|
                existing = User.query.filter(User.phone == data['phone'], User.id != user_id).first()
 | 
						|
                if existing:
 | 
						|
                    return False, "手机号已被使用"
 | 
						|
                user.phone = data['phone']
 | 
						|
 | 
						|
            if 'nickname' in data and data['nickname']:
 | 
						|
                user.nickname = data['nickname']
 | 
						|
 | 
						|
            # 只有管理员可以修改这些字段
 | 
						|
            if 'role_id' in data:
 | 
						|
                user.role_id = data['role_id']
 | 
						|
 | 
						|
            if 'status' in data:
 | 
						|
                user.status = data['status']
 | 
						|
 | 
						|
            if 'password' in data and data['password']:
 | 
						|
                user.set_password(data['password'])
 | 
						|
 | 
						|
            user.updated_at = datetime.now()
 | 
						|
            db.session.commit()
 | 
						|
            return True, "用户信息更新成功"
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            db.session.rollback()
 | 
						|
            return False, f"更新失败: {str(e)}"
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def change_user_status(user_id, status):
 | 
						|
        """变更用户状态 (启用/禁用)"""
 | 
						|
        user = User.query.get(user_id)
 | 
						|
        if not user:
 | 
						|
            return False, "用户不存在"
 | 
						|
 | 
						|
        try:
 | 
						|
            user.status = status
 | 
						|
            user.updated_at = datetime.now()
 | 
						|
            db.session.commit()
 | 
						|
            status_text = "启用" if status == 1 else "禁用"
 | 
						|
            return True, f"用户已{status_text}"
 | 
						|
        except Exception as e:
 | 
						|
            db.session.rollback()
 | 
						|
            return False, f"状态变更失败: {str(e)}"
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def delete_user(user_id):
 | 
						|
        """删除用户 (物理删除)"""
 | 
						|
        user = User.query.get(user_id)
 | 
						|
        if not user:
 | 
						|
            return False, "用户不存在"
 | 
						|
 | 
						|
        try:
 | 
						|
            # 检查是否有未归还的图书 - 使用SQLAlchemy的text函数
 | 
						|
            active_borrows_count = db.session.execute(
 | 
						|
                text("SELECT COUNT(*) FROM borrow_records WHERE user_id = :user_id AND return_date IS NULL"),
 | 
						|
                {"user_id": user_id}
 | 
						|
            ).scalar()
 | 
						|
 | 
						|
            if active_borrows_count > 0:
 | 
						|
                return False, f"无法删除:该用户还有 {active_borrows_count} 本未归还的图书"
 | 
						|
 | 
						|
            # 删除用户相关的通知记录 - 使用SQLAlchemy的text函数
 | 
						|
            db.session.execute(
 | 
						|
                text("DELETE FROM notifications WHERE user_id = :user_id OR sender_id = :user_id"),
 | 
						|
                {"user_id": user_id}
 | 
						|
            )
 | 
						|
 | 
						|
            # 物理删除用户
 | 
						|
            db.session.delete(user)
 | 
						|
            db.session.commit()
 | 
						|
            return True, "用户已永久删除"
 | 
						|
        except Exception as e:
 | 
						|
            db.session.rollback()
 | 
						|
            return False, f"删除失败: {str(e)}"
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_all_roles():
 | 
						|
        """获取所有角色"""
 | 
						|
        return Role.query.all()
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def create_role(role_name, description, permission_ids=None):
 | 
						|
        """创建新角色,包括权限分配"""
 | 
						|
        from app.models.user import Role
 | 
						|
        from app.models.permission import Permission
 | 
						|
 | 
						|
        # 检查角色名是否已存在
 | 
						|
        if Role.query.filter_by(role_name=role_name).first():
 | 
						|
            return False, "角色名称已存在", None
 | 
						|
 | 
						|
        role = Role(role_name=role_name, description=description)
 | 
						|
 | 
						|
        # 添加权限
 | 
						|
        if permission_ids:
 | 
						|
            permissions = Permission.query.filter(Permission.id.in_(permission_ids)).all()
 | 
						|
            role.permissions = permissions
 | 
						|
 | 
						|
        try:
 | 
						|
            db.session.add(role)
 | 
						|
            db.session.commit()
 | 
						|
            return True, "角色创建成功", role.id
 | 
						|
        except Exception as e:
 | 
						|
            db.session.rollback()
 | 
						|
            return False, f"创建失败: {str(e)}", None
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def update_role(role_id, role_name, description, permission_ids=None):
 | 
						|
        """更新角色信息,包括权限"""
 | 
						|
        from app.models.user import Role
 | 
						|
        from app.models.permission import Permission
 | 
						|
 | 
						|
        role = Role.query.get(role_id)
 | 
						|
        if not role:
 | 
						|
            return False, "角色不存在"
 | 
						|
 | 
						|
        # 系统内置角色不允许修改名称
 | 
						|
        if role_id in [1, 2] and role.role_name != role_name:
 | 
						|
            return False, "系统内置角色不允许修改名称"
 | 
						|
 | 
						|
        # 检查角色名是否已存在
 | 
						|
        existing_role = Role.query.filter(Role.role_name == role_name, Role.id != role_id).first()
 | 
						|
        if existing_role:
 | 
						|
            return False, "角色名称已存在"
 | 
						|
 | 
						|
        role.role_name = role_name
 | 
						|
        role.description = description
 | 
						|
 | 
						|
        # 更新权限(如果提供了权限列表且不是内置角色)
 | 
						|
        if permission_ids is not None and role_id not in [1, 2]:
 | 
						|
            permissions = Permission.query.filter(Permission.id.in_(permission_ids)).all()
 | 
						|
            role.permissions = permissions
 | 
						|
 | 
						|
        try:
 | 
						|
            db.session.commit()
 | 
						|
            return True, "角色更新成功"
 | 
						|
        except Exception as e:
 | 
						|
            db.session.rollback()
 | 
						|
            return False, f"更新失败: {str(e)}"
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def create_user(data):
 | 
						|
        """创建新用户"""
 | 
						|
        try:
 | 
						|
            new_user = User(
 | 
						|
                username=data['username'],
 | 
						|
                password=data['password'],
 | 
						|
                email=data['email'],
 | 
						|
                nickname=data.get('nickname') or data['username'],
 | 
						|
                phone=data.get('phone'),
 | 
						|
                role_id=data.get('role_id', 2),  # 默认为普通用户
 | 
						|
                status=data.get('status', 1)  # 默认为启用状态
 | 
						|
            )
 | 
						|
            db.session.add(new_user)
 | 
						|
            db.session.commit()
 | 
						|
            return True, '用户创建成功'
 | 
						|
        except Exception as e:
 | 
						|
            db.session.rollback()
 | 
						|
            logging.error(f"创建用户失败: {str(e)}")
 | 
						|
            return False, f'创建用户失败: {str(e)}'
 | 
						|
 |