SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html步骤一:
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | #!/usr/bin/env python# -*- coding:utf-8 -*- from sqlalchemy import create_engine engine.execute( "INSERT INTO ts_test (a, b) VALUES (‘2‘, ‘v1‘)") engine.execute( "INSERT INTO ts_test (a, b) VALUES (%s, %s)", ((555, "v1"),(666, "v1"),))engine.execute( "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)", id=999, name="v1") result = engine.execute(‘select * from ts_test‘)result.fetchall() |
步骤二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | #!/usr/bin/env pythonfrom sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKeymetadata = MetaData()user = Table(‘user‘, metadata, Column(‘id‘, Integer, primary_key=True), Column(‘name‘, String(20)),)color = Table(‘color‘, metadata, Column(‘id‘, Integer, primary_key=True), Column(‘name‘, String(20)),)metadata.create_all(engine) |

添加(在上述代码后面添加):
1 2 3 4 5 | conn = engine.connect()# 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)conn.execute(user.insert(),{‘id‘:7,‘name‘:‘seven‘})conn.close() |

删除(同上):

1 2 3 4 | #sql = user.insert().values(id=123, name=‘hetan‘)#conn.execute(sql)sql = user.delete().where(user.c.id > 1)conn.execute(sql) |

修改(同上):

1 2 | sql = user.update().where(user.c.name == ‘hetan‘).values(name=‘ed‘)conn.execute(sql) |

查询(同上):
1 2 3 4 | sql = select([user,])result = conn.execute(sql)print(result.fetchall()) |

查询语句还有如下:
1 2 3 4 | # sql = select([user.c.id, ])# sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)# sql = select([user.c.name]).order_by(user.c.name)# sql = select([user]).group_by(user.c.name) |
一个完整的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | #!/usr/bin/env pythonfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column ,Integer ,Stringfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_engineBase = declarative_base()class Host(Base): __tablename__= ‘hosts‘ id = Column(Integer,primary_key=True,autoincrement=True) hostname = Column(String(64),unique=True,nullable=False) ip_addr = Column(String(128),unique=True,nullable=False) port = Column(Integer,default=22)Base.metadata.create_all(engine)if __name__ == ‘__main__‘: SessionCls = sessionmaker(bind=engine) session = SessionCls() h1 = Host(hostname=‘localhost‘,ip_addr=‘127.0.0.1‘) h2 = Host(hostname=‘unbuntu‘,ip_addr=‘192.168.1.1‘) session.add_all([h1,h2]) session.commit() |

1 2 3 4 5 | h3 = Host(hostname=‘ubuntu2‘,ip_addr=‘192.168.2.244‘,port=20000) h3.hostname = ‘ubuntu_test‘ #只要没提交,此时修改也没问题 session.rollback() session.add(h3) session.commit() |

注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。
步骤三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
创建表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | #!/usr/bin/env python# -*- coding:utf-8 -*-from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, Stringfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_engineBase = declarative_base()class User(Base): __tablename__ = ‘users‘ id = Column(Integer, primary_key=True) name = Column(String(50))# 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息Base.metadata.create_all(engine)Session = sessionmaker(bind=engine)session = Session() |

添加:
1 2 3 4 5 6 7 | u = User(id=2, name=‘sb‘)session.add(u)session.add_all([ User(id=3, name=‘sb‘), User(id=4, name=‘sb‘)])session.commit() |
删除:
1 2 | session.query(User).filter(User.id > 2).delete()session.commit() |

修改:
res = session.query(User).filter(User.id == 2)res.id = 3print(res.id)session.commit()

ret = session.query(User).filter_by(name=‘sb‘).first()print(ret.id)ret = session.query(User).filter_by(name=‘sb‘).all()print(ret)session.commit()

# ret = session.query(User).filter(User.name.in_([‘sb‘,‘bb‘])).all()# print ret# ret = session.query(User.name.label(‘name_label‘)).all()# print ret,type(ret)# ret = session.query(User).order_by(User.id).all()# print ret# ret = session.query(User).order_by(User.id)[1:3]# print ret
#!/usr/bin/env python# -*- coding:utf-8 -*-from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column ,Integer ,String,ForeignKey,Tablefrom sqlalchemy.orm import sessionmaker,relationshipfrom sqlalchemy import create_engineBase = declarative_base()engine = create_engine("mysql+pymysql://root:123456@localhost:3306/test",echo=True)class Host(Base):__tablename__= ‘hosts‘id = Column(Integer,primary_key=True,autoincrement=True)group_id = Column(Integer,ForeignKey(‘group.id‘))hostname = Column(String(64),unique=True,nullable=False)ip_addr = Column(String(128),unique=True,nullable=False)port = Column(Integer,default=22)groups = relationship(‘Group‘,backref=‘host‘)def __repr__(self):return ‘<id=%s hostname=%s ip_addr=%s>‘ %(self.id,self.hostname,self.ip_addr)class Group(Base):__tablename__ = ‘group‘id = Column(Integer,primary_key=True)name = Column(String(64),unique=True,nullable=True)def __repr__(self):return ‘<id=%s name=%s>‘ %(self.id,self.name)Base.metadata.create_all(engine)SessionCls = sessionmaker(bind=engine)session = SessionCls()g1 = Group(name = ‘g1‘)g2 = Group(name = ‘g2‘)g3 = Group(name = ‘g3‘)g4 = Group(name = ‘g4‘)session.add_all([g1,g2,g3,g4])h1 = Host(hostname=‘hetan‘,ip_addr=‘127.0.0.1‘)h2 = Host(hostname=‘liuyao‘,ip_addr=‘10.0.0.1‘)session.add_all([h1,h2])session.commit()

g4 = session.query(Group).filter(Group.name==‘g4‘).first()h1 = session.query(Host).filter(Host.hostname==‘hetan‘).update({‘group_id‘:g4.id})session.commit()

#!/usr/bin/env pythonfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column ,Integer ,String,ForeignKey,Tablefrom sqlalchemy.orm import sessionmaker,relationshipfrom sqlalchemy import create_engineBase = declarative_base()engine = create_engine("mysql+pymysql://root:123456@localhost:3306/test",echo=True)host_to_group = Table(‘host_2_group‘,Base.metadata,Column(‘host_id‘,ForeignKey(‘hosts.id‘),primary_key=True),Column(‘group_id‘,ForeignKey(‘group.id‘),primary_key=True))class Host(Base):__tablename__= ‘hosts‘id = Column(Integer,primary_key=True,autoincrement=True)hostname = Column(String(64),unique=True,nullable=False)ip_addr = Column(String(128),unique=True,nullable=False)port = Column(Integer,default=22)groups = relationship(‘Group‘,secondary=host_to_group,backref=‘host‘)def __repr__(self):return ‘<id=%s hostname=%s ip_addr=%s>‘ %(self.id,self.hostname,self.ip_addr)class Group(Base):__tablename__ = ‘group‘id = Column(Integer,primary_key=True)name = Column(String(64),unique=True,nullable=True)def __repr__(self):return ‘<id=%s name=%s>‘ %(self.id,self.name)Base.metadata.create_all(engine)SessionCls = sessionmaker(bind=engine)session = SessionCls()g1 = Group(name = ‘g1‘)g2 = Group(name = ‘g2‘)g3 = Group(name = ‘g3‘)g4 = Group(name = ‘g4‘)session.add_all([g1,g2,g3,g4])h1 = Host(hostname=‘hetan‘,ip_addr=‘127.0.0.1‘)h2 = Host(hostname=‘liuyao‘,ip_addr=‘10.0.0.1‘)session.add_all([h1,h2])groups = session.query(Group).all()hosts = session.query(Host).all()print(hosts,groups)h1.groups = groups[1:-1] #关联g1.host = hosts #关联- session.commit()


g1 = session.query(Group).first()h1 = session.query(Host).first()print(‘--->‘,g1.host)print(‘--->‘,h1.groups)session.commit()

成功
原文:http://www.cnblogs.com/hetan/p/5331127.html