自学内容网 自学内容网

sqlalchemy异步方法使用

建立模型

  1. 创建基类base.py

    from sqlalchemy.orm import DeclarativeBase
    
    class Base(DeclarativeBase):
        pass
    
  2. 以用户为例,建立用户模型继承基类

    from sqlalchemy import Integer, String, ForeignKey, DateTime, Boolean
    from sqlalchemy.orm import mapped_column, Mapped
    
    from src.db.model.base import Base
    
    # 用户表
    class User(Base):
        __tablename__ = 'user'
        id: Mapped[int] = mapped_column(Integer, primary_key=True, comment='用户id')
        username: Mapped[str] = mapped_column(String(64), unique=True, nullable=True, comment='用户名称')
        password: Mapped[str] = mapped_column(String(64), comment='用户密码')
        name: Mapped[str] = mapped_column(String(32), comment='姓名')
        mobile_phone: Mapped[str] = mapped_column(String(32), comment='手机号')
        cloud_role_id: Mapped[int] = mapped_column(Integer, ForeignKey('cloud_role.id'), comment='平台角色id')
        user_group_id: Mapped[int] = mapped_column(Integer, ForeignKey('user_group.id'), comment='所在用户组id')
        status: Mapped[Boolean] = mapped_column(Boolean, comment='状态')
        register_time: Mapped[str] = mapped_column(String(32), comment='注册时间')
        last_login_time: Mapped[str] = mapped_column(DateTime, comment='最后登录时间')
    
        __table_args__ = ({'comment': '用户表'})
    
    

    comment表示注释,生产mysql的数据表里面会带上注释

  3. 建立异步引擎和session,后面接口的async_session()都从这里引入

    from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
    
    engine = create_async_engine(DATABASE_URL, future=True, pool_pre_ping=True, pool_recycle=3600)
    async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
    

编写异步接口

  1. 添加用户

    class UserDao:
        def __init__(self):
            pass
    
       @classmethod
        async def add_user(cls, username: str, password: str, name: str, mobile_phone: Optional[str], cloud_role_id: int,
                           user_group_id: int, status: int) -> bool:
            if mobile_phone is None:
                mobile_phone = ''
            # 对密码进行hash
            md_password = hashlib.md5(password.encode('utf-8')).hexdigest()
            user = User(username=username, password=md_password, name=name,
                        mobile_phone=mobile_phone, cloud_role_id=cloud_role_id, user_group_id=user_group_id, status=status)
            user.register_time = datetime.datetime.now()
            user.last_login_time = user.register_time
            
    # 使用with包裹可以自动处理session的commit()和rollback()
            async with async_session() as session:
                async with session.begin():
                    try:
                        session.add(user)
                        await session.flush()
                        return True
                    except Exception as e:
                        log.error(e)
                        return False
    
  2. 删除用户

        @classmethod
        async def delete_user(cls, user_id: int) -> bool:
            async with async_session() as session:
                async with session.begin():
                    try:
                        query_sql = select(User).filter(User.id == user_id)
                        user = (await session.execute(query_sql)).scalar()
                        if user is not None:
                            # 删除站点关联的用户记录
                            delete_relation_sql = delete(StationSiteUsers).filter(StationSiteUsers.user_id == user_id)
                            await session.execute(delete_relation_sql)
                            # 删除用户
                            delete_user_sql = delete(User).filter(User.id == user_id)
                            await session.execute(delete_user_sql)
                            return True
                        else:
                            return False
                    except Exception as e:
                        log.error(e)
                        return False
    

    注意,使用select查询时获取单个要用scalar(),因为sqlalchemy的异步方法不支持query。

    一般是select搭配scalar()使用,query()搭配fisrt()使用

  3. 编辑用户

        @classmethod
        async def edit_user(cls, user_id: int, username: str, name: str, mobile_phone: Optional[str],
                            cloud_role_id: int, user_group_id: int, status: int) -> bool:
            if mobile_phone is None:
                mobile_phone = ''
            async with async_session() as session:
                async with session.begin():
                    try:
                        # 查询user
                        query_sql = select(User).filter(User.id == user_id)
                        user = (await session.execute(query_sql)).scalar()
                        if user is not None:
                            user.username = username
                            user.name = name
                            user.mobile_phone = mobile_phone
                            user.cloud_role_id = cloud_role_id
                            user.user_group_id = user_group_id
                            user.status = status
                            return True
                        else:
                            return False
                    except Exception as e:
                        log.error(e)
                        return False
    
  4. 查询用户信息

        # 获取所有用户信息
        @classmethod
        async def get_all_user_info(cls) -> List[dict]:
            async with async_session() as session:
                async with session.begin():
                    try:
                        query_sql = select(User.id, User.username, User.name, User.mobile_phone, User.cloud_role_id,
                                           User.status, User.user_group_id, UserGroup.group_name,
                                           User.last_login_time).join(
                            CloudRole, User.cloud_role_id == CloudRole.id).join(
                            UserGroup, User.user_group_id == UserGroup.id)
                        results = (await session.execute(query_sql)).fetchall()
                        user_info_list = []
                        for result in results:
                            user_dict = {'user_id': result[0], 'username': result[1], 'name': result[2],
                                         'mobile_phone': result[3],
                                         'cloud_role_id': result[4], 'status': result[5], 'user_group_id': result[6],
                                         'user_group_name': result[7],
                                         'last_login_time': datetime.datetime.strftime(result[8], '%Y-%m-%d %H:%M:%S')}
                            user_info_list.append(user_dict)
                        return user_info_list
                    except Exception as e:
                        log.error(e)
                        return []
    

使用pytest编写单元测试

  1. 测试user方法

    import pytest
    
    from src.db.async_controller.config import engine
    from src.db.async_dao.user_dao import UserDao
    
    user_dao = UserDao()
    
    
    class TestUserDao:
        def setup_method(self):
            pass
    
        def teardown_method(self):
            engine.dispose()
    
        @pytest.mark.asyncio
        async def test_add_user(self):
            await user_dao.add_user(username='superadmin', password="superadmin", name="superadmin",
                                    mobile_phone="123456", cloud_role_id=1, user_group_id=1, status=1)
            await user_dao.add_user(username='cdy', password="888888", name="cdy",
                                    mobile_phone="123456", cloud_role_id=2, user_group_id=2, status=1)
            # 管理员账号
            await user_dao.add_user(username='admin', password="888888", name="admin",
                                    mobile_phone="123456", cloud_role_id=1, user_group_id=3, status=1)
            await user_dao.add_user(username='admin1', password="888888", name="admin",
                                    mobile_phone="123456", cloud_role_id=1, user_group_id=3, status=1)
            await user_dao.add_user(username='admin2', password="888888", name="admin",
                                    mobile_phone="123456", cloud_role_id=1, user_group_id=3, status=1)
            # 普通账号
            await user_dao.add_user(username='hongdou1', password="888888", name="test",
                                    mobile_phone="123456", cloud_role_id=2, user_group_id=4, status=1)
            await user_dao.add_user(username='hongdou2', password="123456", name="test",
                                    mobile_phone="123456", cloud_role_id=2, user_group_id=4, status=1)
            await user_dao.add_user(username='hongdou3', password="123456", name="test",
                                    mobile_phone="123456", cloud_role_id=2, user_group_id=4, status=1)
            await user_dao.add_user(username='hongdou4', password="123456", name="test",
                                    mobile_phone="123456", cloud_role_id=2, user_group_id=4, status=1)
    
        @pytest.mark.asyncio
        async def test_delete_user(self):
            result = await user_dao.delete_user(user_id=7)
            print(result)
    
        @pytest.mark.asyncio
        async def test_get_accessible_user_list(self):
            print(await user_dao.get_accessible_user_list(user_id=1))
    
    

原文地址:https://blog.csdn.net/QAZ600888/article/details/144449998

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!