目录
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
pip3 install sqlalchemySQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
"""
MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
    
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
"""django中如何反向生成models
python manage.py inspectdb > app/models.pySQLAlchemy只能创建表,删除表,不能在原先的表上在进行修改,如果要进行修改,可以在数据库进行修改,然后再在对应的类上进行修改
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
engine = create_engine(
    "mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
def task(arg):
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from app01_book"
    )
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()
for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()from sqlalchemy import create_enginecreate_engine()返回一个Engine的实例,并且它表示通过数据库语法处理细节的核心接口,在这种情况下,数据库语法将会被解释称Python的类方法
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test',echo=True)连接 echo参数为True时,会显示每条执行的sql语句
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test')from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()有了这个Base,我们可以依据这个base定义任意数量的映射类:
class User(Base):
    __tablename__ = 'users'  # 数据库表名称
    id = Column(Integer, primary_key=True)  # id 主键
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    # email = Column(String(32), unique=True)
    #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    # ctime = Column(DateTime, default=datetime.datetime.now)
    # extra = Column(Text, nullable=True)
    __table_args__ = (
        # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
        # Index('ix_id_name', 'name', 'email'), #索引
    )注意: 用Declarative 构造的一个类至少需要一个tablename属性,一个主键行。
SQLAlchemy不能通过类似于与django的makemigerations和migerate自动生成表,需要我们自己进行表的生成
def init_db():
    """
    根据类创建数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.create_all(engine)SQLAlchemy不支持在表创建完成后,再进行表里面的字段进行修改,增加,删除,所以如果要进行表的字段修改,有两种方法:
def drop_db():
    """
    根据类删除数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()
class Users(Base):
    __tablename__ = 'users'  # 数据库表名称
    id = Column(Integer, primary_key=True)  # id 主键
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    age = Column(Integer, default=0)
    # email = Column(String(32), unique=True)
    #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    # ctime = Column(DateTime, default=datetime.datetime.now)
    # extra = Column(Text, nullable=True)
    __table_args__ = (
        # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
        # Index('ix_id_name', 'name', 'email'), #索引
    )
def init_db():
    """
    根据类创建数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.create_all(engine)
def drop_db():
    """
    根据类删除数据库表
    :return:
    """
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    Base.metadata.drop_all(engine)
if __name__ == '__main__':
    # drop_db()
    init_db()| 数据类型 | 说明 | 
|---|---|
| Integer | 整形,映射到数据库中是int类型。 | 
| Float | 浮点类型,映射到数据库中是float类型。他占据的32位。 | 
| Double | 双精度浮点类型,映射到数据库中是double类型,占据64位。 | 
| String | 可变字符类型,映射到数据库中是varchar类型. | 
| Boolean | 布尔类型,映射到数据库中的是tinyint类型。 | 
| DECIMAL | 定点类型。是专门为了解决浮点类型精度丢失的问题的。在存储钱相关的字段的时候建议大家都使用这个数据类型。并且这个类型使用的时候需要传递两个参数,第一个参数是用来标记这个字段总能能存储多少个数字,第二个参数表示小数点后有多少位。 | 
| Enum | 枚举类型。指定某个字段只能是枚举中指定的几个值,不能为其他值。在ORM模型中,使用Enum来作为枚举 | 
| Date | 存储时间,只能存储年月日。映射到数据库中是date类型。在Python代码中,可以使用 datetime.date来指定 | 
| DateTime | 存储时间,可以存储年月日时分秒毫秒等。映射到数据库中也是datetime类型。在Python代码中,可以使用 datetime.datetime来指定。 | 
| Time | 存储时间,可以存储时分秒。映射到数据库中也是time类型。在Python代码中,可以使用 datetime.time来至此那个。 | 
| Text | 存储长字符串。一般可以存储6W多个字符。如果超出了这个范围,可以使用LONGTEXT类型。映射到数据库中就是text类型。 | 
| LONGTEXT | 长文本类型,映射到数据库中是longtext类型。 | 
| 参数 | 详情 | 
|---|---|
| default | 默认值 | 
| nullable | 是否为空 | 
| primary_key | 主键 | 
| unique | 是否唯一 | 
| autoincrement | 是否自增 | 
| onupdate | 更新时执行的 | 
| name | 数据库映射后的属性 | 
| index | 是否建立索引 | 
user1 = User(name='hades', age=18)
user2 = User(name='bonnie', age=16)准备好和数据库会话了,ORM通过Session与数据库建立连接的
当应用第一次载入时,我们定义一个Session类(声明Create_engine()的同时),这个Session类为新的Session对象提供工厂服务。
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)这个定制的Session类会创建绑定到数据库的Session对象。如果需要和数据库建立连接,只需要实例化一个session对象
session =Session()虽然上面的Session已经和数据库引擎Engine关联,但是还没有打开任何连接。当它第一次被使用时,就会从Engine维护的一个连接池中检索是否存在连接,如果存在便会保持连接知道我们提交所有更改并且/或者关闭session对象。
# 增加一个
session.add(user1)
session.add(user2)
# 增加多个,可以增加不同的映射实例
# session.add_all([user1, user2, Hosts(ip='127.0.0.1')])至此,我们可以认为,新添加的这个对象实例还在等待着;user1对象现在并不代表数据库中的一行数据。直到使用flush进程,Session才会让SQL保持连接。如果查询这条数据的话,所有等待信息会被第一时间刷新,查询结果也会立即发行。
 session.commit()session.rollback()通过Session的query()方法创建一个查询对象。这个函数的参数数量是可变的,参数可以是任何类或者类的描述集合
下面是一个迭代输出User类的例子:
session.query(Users).filter_by(name='lqz').first()session.query(User).order_by(User.id).all()
# desc(): 降序,一定要加()
session.query(User).order_by(User.id.desc()).all()
# asc():升序
session.query(User).order_by(Users.name.desc(),User.id.asc()).all()Query也支持ORM描述作为参数。任何时候,多个类的实体或者是基于列的实体表达都可以作为query()函数的参数,返回类型是元组:
session.query(User.name,User.fullname)
session.query(User,User.name).all()label()相当于row.namesession.query(User.name.label("name_label")).all()aliased()from sqlalchemy.orm import aliased
user_alias = aliased(User,name='user_alias')
session.query(user_alias,user_alias.name).all()Query 的基本操作包括LIMIT和OFFSET,使用python数组切片和ORDERBY结合可以让操作变得很方便。
只查询第二条和第三条数据
session.query(User).order_by(User.id)[1:3]使用关键字变量过滤查询结果,filter 和filter_by都使用
session.query(User).filter(User.name=='hades').all()
session.query(User).filter_by(name='bonnie').all()filter与filter_by的区别:
equals
session.query(User).filter(User.name == 'ed')not equals
session.query(User).filter(User.name != 'ed')like
session.query(User).filter(User.name.like('%ed%'))in
query.filter(User.name.in_(['ed','wendy','jack']))
# 子查询
session.query(User).filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%'))not in
query.filter(~User.name.in_('ed','wendy','jack'))is null
session.query(User).filter(User.name == None) is not null
session.query(User).filter(User.name != None)and
session.query(Users).filter(and_(User.name =='ed',User.fullname =='Ed Jones')) # and
session.query(Users).filter(User.name == 'ed',User.fullname =='Ed Jones') # and
session.query(Users).filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')# andor
query.filter(or_(User.name='ed', User.name='wendy'))占位符查找
#:value 和:name 相当于占位符,用params传参数
session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()count = session.query(User).filter(User.name.like("%t%")).count()session.query(func.count(User.name),User.name).group_by(User.name)having作为分组的筛选条件
session.query(func.min(User.id), func.avg(User.id)).group_by(Users.name).having(func.min(Users.id) >2).all()func.count:统计行的数量,和count作用一样
fc=session.query(func.count(User.name),User.name).group_by(User.name).all()func.avg:求平均值
fc=session.query(func.avg(User.age),User.name).group_by(User.name).all()func.max:求最大值
fc=session.query(func.max(User.age),User.name).group_by(User.name).all()func.min:求最小值
fc=session.query(func.min(User.age),User.name).group_by(User.name).all()func.sum:求和
fc=session.query(func.sum(User.age),User.name).group_by(User.name).all()第一种:先查询出对象,然后再赋予对象字段新的值
obj = session.query(User).filter(User.name=='hades').first()
obj.age = 27
session.commit()  # 一定要提交第二种:update()方法,需要传入一个字典
session.query(User).filter(User.name=='hades').update({'age':27})
session.commit()  # 一定要提交第三种:在原先的基础上增加,类似于django中的F查询
比如:年龄加1岁
注意:后面必须配合synchronize_session
synchronize_session=Falsesynchronize_session=evaluatasession.query(User).filter(User.id > 0).update({User.name: User.name + "099"}, synchronize_session=False)
# session.query(User).filter(User.id > 0).update({"age": User.age + 1}, synchronize_session="evaluate")
# session.commit()session.query(Users).filter(Users.id > 4).delete()
session.commit()原文:https://www.cnblogs.com/Hades123/p/11789918.html