之前写过一篇博客介绍过sqlalchemy的基本用法,本篇博客主要介绍除增删改查以外SQLAlchemy对数据库表的操作,主要内容有单表操作、一对多操作、多对多操作。
单表操作的增删改查在上篇博客中已经详细介绍过,这里不再详细介绍,今天主要对数据库查询在详细介绍一下,下面我们先创建表并插入数据。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | #!/usr/bin/env python# -*- coding: utf-8 -*- fromsqlalchemy importand_, or_fromsqlalchemy importcreate_enginefromsqlalchemy.ext.declarative importdeclarative_basefromsqlalchemy importColumn, Integer, String, ForeignKey, UniqueConstraint, Indexfromsqlalchemy.orm importsessionmaker, relationship Base =declarative_base() classGroup(Base):    __tablename__=‘group‘    nid =Column(Integer, primary_key=True,autoincrement=True)    caption =Column(String(32)) classUser(Base):    __tablename__=‘user‘    nid =Column(Integer, primary_key=True,autoincrement=True)    username =Column(String(32))    group_id =Column(Integer, ForeignKey(‘group.nid‘))    group =relationship("Group",backref=‘uuu‘)  #跟Group表建立关系,方便查询,常和ForeignKey在一起使用 definit_table():    """    创建表,调用Base类的子类    :return:    """    Base.metadata.create_all(engine) defdrop_table():    Base.metadata.drop_all(engine) init_table()Session =sessionmaker(bind=engine)session =Session() # 单表操作:session.add(Group(caption=‘dba‘))   #往组里添加数据session.add(Group(caption=‘dddd‘))session.commit() session.add_all([    User(username=‘jack1‘,group_id=1),    User(username=‘jack2‘,group_id=1),    User(username=‘jack1‘,group_id=2),    User(username=‘jack1‘,group_id=1),    User(username=‘jack2‘,group_id=1),])session.commit() | 
| 1 2 3 4 5 6 7 8 9 | #查询用户jack1的nid,filter和filter_by两种书写方式ret1 =session.query(User.nid).filter(User.username==‘jack1‘).all()print(ret1)ret2 =session.query(User.nid).filter_by(username=‘jack1‘).all()print(ret2)#结果:[(1,), (3,), (4,)][(1,), (3,), (4,)] | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | #查询用户nid大于1并且username等于jack2的nidret1 =session.query(User.nid).filter(User.nid >1,User.username==‘jack2‘).all()print(ret1)#结果:[(2,), (5,)]#查询nid在1和3之间username等于jack1的所有信息ret2=session.query(User.nid,User.username).filter(User.nid.between(1,3),User.username==‘jack1‘).all()print(ret2)#结果:[(1, ‘jack1‘), (3, ‘jack1‘)] | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | #查询用户nid在1,3,4这个列表里的用户信息ret=session.query(User.nid,User.username).filter(User.nid.in_([1,3,4])).all()print(ret)  #结果:[(1, ‘jack1‘), (3, ‘jack1‘), (4, ‘jack1‘)]  #取反,查询用户nid不在1,3,4这个列表里的用户信息ret1=session.query(User.nid,User.username).filter(~User.nid.in_([1,3,4,])).all()print(ret1)  #结果:[(2, ‘jack2‘), (5, ‘jack2‘)] #查询username=‘jack1‘的所有信息ret2 =session.query(User.nid,User.username).filter(User.nid.in_(session.query(User.nid).filter_by(username=‘jack1‘))).all()print(ret2) #结果:[(1, ‘jack1‘), (3, ‘jack1‘), (4, ‘jack1‘)] | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | #查询nid大于3并且username=‘jack1‘的信息ret =session.query(User.nid,User.username).filter(and_(User.nid >3,User.username==‘jack1‘)).all()print(ret)#结果:[(4, ‘jack1‘)]#查询nid小于2或者username等于jack1的数据ret =session.query(User.nid,User.username).filter(or_(User.nid < 2, User.username ==‘jack1‘)).all()print(ret)#查询用户nid小于2或者username等于jack1并且nid大于3的信息ret =session.query(User.nid,User.username).filter(    or_(User.nid < 2,and_(User.username ==‘jack1‘, User.nid > 3))).all()print(ret)#结果:[(1, ‘jack1‘), (4, ‘jack1‘)] | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | #模糊匹配用户名以字母j开头的所有数据ret =session.query(User.nid,User.username).filter(User.username.like(‘j%‘)).all()#结果:[(1, ‘jack1‘), (2, ‘jack2‘), (3, ‘jack1‘), (4, ‘jack1‘), (5, ‘jack2‘)]#取反ret1 =session.query(User.nid,User.username).filter(~User.username.like(‘j%‘)).all()print(ret)print(ret1)#结果:[] | 
| 1 2 3 4 5 | ret=session.query(User.nid,User.username)[1:2]print(ret)#结果:[(2, ‘jack2‘)] | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | #倒序排序ret=session.query(User.nid,User.username).order_by(User.nid.desc()).all()print(ret)   #结果:[(5, ‘jack2‘), (4, ‘jack1‘), (3, ‘jack1‘), (2, ‘jack2‘), (1, ‘jack1‘)]#正序排序ret1=session.query(User.nid,User.username).order_by(User.nid.asc()).all()print(ret1)#结果:[(1, ‘jack1‘), (2, ‘jack2‘), (3, ‘jack1‘), (4, ‘jack1‘), (5, ‘jack2‘)] | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | #导入模块fromsqlalchemy.sql importfunc ret =session.query(User.nid,User.username).group_by(User.nid).all()print(ret) #结果:[(1, ‘jack1‘), (2, ‘jack2‘), (3, ‘jack1‘), (4, ‘jack1‘), (5, ‘jack2‘)] ret1=session.query(    func.max(User.nid),    func.sum(User.nid),    func.min(User.nid),).group_by(User.username).all()print(ret1) #结果:[(4, Decimal(‘8‘), 1), (5, Decimal(‘7‘), 2)] ret2=session.query(    func.max(User.nid),    func.sum(User.nid),    func.min(User.nid), ).group_by(User.username).having(func.min(User.nid)>1).all()print(ret2) #结果:[(5, Decimal(‘7‘), 2)]#打印SQL语句:fromsqlalchemy.sql importfuncret2=session.query(    func.max(User.nid),    func.sum(User.nid),    func.min(User.nid), ).group_by(User.username).having(func.min(User.nid)>1)print(ret2)#结果:SELECT max("user".nid) AS max_1, sum("user".nid) AS sum_1, min("user".nid) AS min_1 FROM "user"GROUP BY "user".username HAVING min("user".nid) > :min_2[(‘jack1‘, ‘dba‘), (‘jack2‘, ‘dddd‘)]SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_id FROM "user"LEFT OUTER JOIN "group"ON "group".nid ="user".group_id | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | q1=session.query(User.username).filter(User.nid >2)q2=session.query(Group.caption).filter(Group.nid <2)ret =q1.union(q2).all()print(ret)#结果:[(‘jack1‘,), (‘jack2‘,), (‘dba‘,)]q1=session.query(User.username).filter(User.nid >2)q2=session.query(Group.caption).filter(Group.nid <2)ret =q1.union_all(q2).all()print(ret)#结果:[(‘jack1‘,), (‘jack1‘,), (‘jack2‘,), (‘dba‘,)] | 
一对多的关系就需要我们外键来进行约束,下面我们来举例来说明一对多进行连表操作。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | ret =session.query(User.username,Group.caption).filter(User.nid==Group.nid).all()print(ret)  #结果:[(‘jack1‘, ‘dba‘), (‘jack2‘, ‘dddd‘)]  #通过join来进行连表操作,加isouter的区别:sql1 =session.query(User).join(Group,isouter=True)print(sql1)  #结果:SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_idFROM "user"LEFT OUTER JOIN "group"ON "group".nid ="user".group_id  sql2 =session.query(User).join(Group)print(sql2)  #结果:SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_idFROM "user"JOIN "group"ON "group".nid ="user".group_id#连表操作ret =session.query(User.username,Group.caption).join(Group,isouter=True).filter(Group.caption ==‘dba‘).all()print(ret)#结果:[(‘jack1‘, ‘dba‘), (‘jack2‘, ‘dba‘), (‘jack1‘, ‘dba‘)] | 
| 1 2 3 4 5 6 7 | #首先在创建表的类中加入relationship字段classUser(Base):    __tablename__=‘user‘    nid =Column(Integer, primary_key=True,autoincrement=True)    username =Column(String(32))    group_id =Column(Integer, ForeignKey(‘group.nid‘))    group =relationship("Group",backref=‘uuu‘)   #跟Group表建立关系,方便查询,backref为虚拟列 | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | ret =session.query(User).all()forobj inret:    #obj代指user表的每一行数据    #obj.group代指group对象    print(obj.nid,obj.username,obj.group_id,obj.group_id,obj.group,          obj.group.nid,obj.group.caption)#结果:1jack1 11<__main__.Group objectat 0x0000015D762F4630> 1dba2jack2 11<__main__.Group objectat 0x0000015D762F4630> 1dba3jack1 22<__main__.Group objectat 0x0000015D762F47F0> 2dddd4jack1 11<__main__.Group objectat 0x0000015D762F4630> 1dba5jack2 22<__main__.Group objectat 0x0000015D762F47F0> 2dddd | 
反向查询:通过Group表查询User表
| 1 2 3 4 5 6 7 8 9 | obj =session.query(Group).filter(Group.caption ==‘dba‘).first()print(obj.nid)print(obj.caption)print(obj.uuu)#结果:1dba[<__main__.User objectat 0x000002606096C5C0>, <__main__.User objectat 0x000002606096C630>, <__main__.User objectat 0x000002606096C6A0>] | 
我们可以看到上面的例子输出的为对象的列表,输出不太友好,为了达到自己想要的结果,我们可以进行自定义返回结果,请看下面代码,加入__repr__函数:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | classUser(Base):    __tablename__=‘user‘    nid =Column(Integer, primary_key=True,autoincrement=True)    username =Column(String(32))    group_id =Column(Integer, ForeignKey(‘group.nid‘))    group =relationship("Group",backref=‘uuu‘)  #跟Group表建立关系,方便查询,常和ForeignKey在一起使用    def__repr__(self):        """        自定义返回结果        :return:        """        temp =‘%s:%s:%s‘%(self.nid,self.username,self.group_id)        returntemp | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | #!/usr/bin/env python# -*- coding: utf-8 -*-fromsqlalchemy importcreate_enginefromsqlalchemy.ext.declarative importdeclarative_basefromsqlalchemy importColumn, Integer, String, ForeignKey, UniqueConstraint, Indexfromsqlalchemy.orm importsessionmaker, relationshipBase =declarative_base()classHost(Base):    __tablename__ =‘host‘    nid =Column(Integer, primary_key=True,autoincrement=True)    hostname =Column(String(32))    port =Column(String(32))    ip =Column(String(32))classHostUser(Base):    __tablename__ =‘host_user‘    nid =Column(Integer, primary_key=True,autoincrement=True)    username =Column(String(32))#使用for循环时,通过正向反向查询classHostToHostUser(Base):    __tablename__ =‘host_to_host_user‘    nid =Column(Integer, primary_key=True,autoincrement=True)    host_id =Column(Integer,ForeignKey(‘host.nid‘))    host_user_id =Column(Integer,ForeignKey(‘host_user.nid‘))    host =relationship("Host",backref=‘h‘)    host_user =relationship("HostUser",backref=‘u‘)definit_table():    """    创建表,调用Base类的子类    :return:    """    Base.metadata.create_all(engine)defdrop_table():    Base.metadata.drop_all(engine)init_table()Session =sessionmaker(bind=engine)session =Session()session.add_all([    Host(hostname=‘c1‘,port=‘22‘,ip=‘1.1.1.1‘),    Host(hostname=‘c2‘,port=‘22‘,ip=‘1.1.1.2‘),    Host(hostname=‘c3‘,port=‘22‘,ip=‘1.1.1.3‘),    Host(hostname=‘c4‘,port=‘22‘,ip=‘1.1.1.4‘),    Host(hostname=‘c5‘,port=‘22‘,ip=‘1.1.1.5‘),])session.commit()session.add_all([    HostUser(username=‘root‘),    HostUser(username=‘db‘),    HostUser(username=‘nb‘),    HostUser(username=‘sb‘),])session.commit()session.add_all([    HostToHostUser(host_id=1,host_user_id=1),    HostToHostUser(host_id=1,host_user_id=2),    HostToHostUser(host_id=1,host_user_id=3),    HostToHostUser(host_id=2,host_user_id=2),    HostToHostUser(host_id=2,host_user_id=4),    HostToHostUser(host_id=2,host_user_id=3),])session.commit() | 
2,需求:获取主机1中所有的用户
方法一:通过一步一步取
| 1 2 3 4 5 6 7 8 9 10 11 12 | host_obj =session.query(Host).filter(Host.hostname ==‘c1‘).first()# #取出host_obj.nidhost_to_host_user =session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id==host_obj.nid).all()## #因为取出来的结果是[(1,),(2,),(3,)],我们通过内置函数zip来转换成想要的结果r =zip(*host_to_host_user)#users =session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()print(users)#结果:[(‘root‘,), (‘db‘,), (‘nb‘,)] | 
方法二:通过join的方式
| 1 2 | #通过代码整合的代码,相当复杂session.query(HostUser.name).filter(HostUser.nid.in_(session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id ==session.query(Host.nid).filter(Host.hostname ==‘c1‘)))) | 
方法三:通过建立relationship的方式
1,对象
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | #!/usr/bin/env python# -*- coding: utf-8 -*-fromsqlalchemy importcreate_enginefromsqlalchemy.ext.declarative importdeclarative_basefromsqlalchemy importColumn, Integer, String, ForeignKey, UniqueConstraint, Indexfromsqlalchemy.orm importsessionmaker, relationshipBase =declarative_base()classHostToHostUser(Base):    __tablename__ =‘host_to_host_user‘    nid =Column(Integer, primary_key=True,autoincrement=True)    host_id =Column(Integer,ForeignKey(‘host.nid‘))    host_user_id =Column(Integer,ForeignKey(‘host_user.nid‘))classHost(Base):    __tablename__ =‘host‘    nid =Column(Integer, primary_key=True,autoincrement=True)    hostname =Column(String(32))    port =Column(String(32))    ip =Column(String(32))    host_user =relationship(‘HostUser‘,secondary=HostToHostUser.__table__,backref=‘h‘)# host_user = relationship(‘HostUser‘, secondary=lambda: HostToHostUser.__table__, backref=‘h‘)#这里加lambda是因为关系表在下面,可以不加lambda,但是关系表要放上面classHostUser(Base):    __tablename__ =‘host_user‘    nid =Column(Integer, primary_key=True,autoincrement=True)    username =Column(String(32))definit_table():    """    创建表,调用Base类的子类    :return:    """    Base.metadata.create_all(engine)defdrop_table():    Base.metadata.drop_all(engine)Session =sessionmaker(bind=engine)session =Session()host_obj=session.query(Host).filter(Host.hostname==‘c1‘).first()print(host_obj.host_user) | 
2,类
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | #!/usr/bin/env python# -*- coding: utf-8 -*-fromsqlalchemy importcreate_enginefromsqlalchemy.ext.declarative importdeclarative_basefromsqlalchemy importColumn, Integer, String, ForeignKey, UniqueConstraint, Indexfromsqlalchemy.orm importsessionmaker, relationshipBase =declarative_base()classHostToHostUser(Base):    __tablename__ =‘host_to_host_user‘    nid =Column(Integer, primary_key=True,autoincrement=True)    host_id =Column(Integer,ForeignKey(‘host.nid‘))    host_user_id =Column(Integer,ForeignKey(‘host_user.nid‘))classHost(Base):    __tablename__ =‘host‘    nid =Column(Integer, primary_key=True,autoincrement=True)    hostname =Column(String(32))    port =Column(String(32))    ip =Column(String(32))    host_user =relationship(‘HostUser‘,secondary=HostToHostUser.__table__,backref=‘h‘)# host_user = relationship(‘HostUser‘, secondary=lambda: HostToHostUser.__table__, backref=‘h‘)#这里加lambda是因为关系表在下面,可以不加lambda,但是关系表要放上面classHostUser(Base):    __tablename__ =‘host_user‘    nid =Column(Integer, primary_key=True,autoincrement=True)    username =Column(String(32))definit_table():    """    创建表,调用Base类的子类    :return:    """    Base.metadata.create_all(engine)defdrop_table():    Base.metadata.drop_all(engine)Session =sessionmaker(bind=engine)session =Session()host_obj=session.query(Host).filter(Host.hostname==‘c1‘).first()print(host_obj.host_user) | 
今天SQLALchemy就介绍到这里,更多参考信息请参考:
原文:http://www.cnblogs.com/phennry/p/5731299.html