亲宝软件园·资讯

展开

Python Sqlalchemy

胡安民-独行者 人气:0

对象关系映射(Object Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术。面向对象的开发方法是当今企业级应用开发环境中的主流开发方法,关系数据库是企业级应用环境中永久存放数据的主流数据存储系统。对象和关系数据是业务实体的两种表现形式,业务实体在内存中表现为对象,在数据库中表现为关系数据。内存中的对象之间存在关联和继承关系,而在数据库中,关系数据无法直接表达多对多关联和继承关系。因此,对象-关系映射(ORM)系统一般以中间件的形式存在,主要实现程序对象到关系数据库数据的映射。

学过java的hibernate框架的那么这个很好上手,非常简单 ,他有两种模式一种纯orm另一种模式是支持原生sql这两种可以混合使用

优点:

缺点: 虽然性能稍稍不及原生SQL,但是操作数据库真的很方便!

官网: https://www.sqlalchemy.org/

概念和数据类型

概念

常见数据类型

安装

pip install SQLAlchemy

​​​​​​​pip install mysqlclient   # 安装自己的数据库客户端(可以是mysql 可以是oracle)

连接

from sqlalchemy import create_engine
engine = create_engine("mysql://user:password@hostname/dbname?charset=uft8",
            echo=True,
            pool_size=8,
            pool_recycle=60*30
            )

创建好了Engine的同时,Pool和Dialect也已经创建好了,但是此时并没有真正与数据库连接,等到执行具体的语句.connect()等时才会连接到数据库。

创建数据库表类(模型)

ORM的重要特点就是操作类来操作数据库,现在我们来创建一个类,以常见的用户表举例:

from sqlalchemy import Column, Integer, String
from src.database.SQLalchemyFast import SQLalchemyFast

class UserDB(SQLalchemyFast.Base):
    __tablename__ = "User"   # __tablename__ 声明表名
    # primary_key为主键,autoincrement为自增, doc 为注释但是不会在数据库中生成
    id = Column(Integer, primary_key=True,autoincrement=True,doc="主键")
    name = Column(String(64), unique=True, doc="用户名") # unique=True 为唯一约束会在数据库中生成索引
    email = Column(String(64), doc="邮箱")
    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __str__(self):
           return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)

上面的SQLalchemyFast.Base是我自己封装的Base ,用于统一管理所有模型类,可以将Python类和数据库表关联映射起来。数据库表模型类通过__tablename__和表关联起来,Column表示数据表的列

生成数据库表

Base = declarative_base()
Base.metadata.create_all(engine)

会自动创建表,如果存在则忽略,执行以上代码,就会发现在db中创建了users表。 前提必须有模型类继承了Base

会话

会话就和打电话一样,打一次电话就是一个会话,就相当于和数据库交互一次就是一个会话,一个会话可以运行多个或单个语句,会话结束必须关闭

sqlalchemy中使用session用于创建程序和数据库之间的会话,所有对象的载入和保存都需要通过session对象 。

通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源:

from sqlalchemy.orm import sessionmaker

# 创建session
DbSession = sessionmaker(bind=engine)
session = DbSession()

session的常见操作方法包括:

增删改查

add_user = UserDB("test", "test123@qq.com")
session.add(add_user)
session.commit()

session.add()将会把Model加入当前session维护的持久空间(可以从session.dirty看到)中,直到commit时提交到数据库。

users = session.query(UserDB).filter(UserDB.id=1).all()
for item in users:
  print(item.name)

session.query(Users).filter(UserDB.id=1).update({'name': "Jack"})

session.query(UserDB).filter(UserDB.name == "test").delete()
session.commit()

执行裸sql

session.execute(text(sql), params)
session.commit()

sql: select * from User where id = :id and name = :name

params: {"id":1,"name":"张三"}`

参数名必须和sql语句中的参数名一致

with关闭会话

        DbSession = sessionmaker(bind=engine)
        with DbSession() as conn:
            # 代码
            conn.commit()

sql建造者模式

需要导入的包

from sqlalchemy import  delete, update, text, select, join, desc, asc

        sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
            where(text("id = :id and name = :name")).\
            group_by(UserDB.id,UserDB.name).\
            having(text("id = :id and name = :name")).\
            order_by(desc("id"),asc("name")).\
            offset(1).limit(10).\
            params(id=1, name="张三")
        print(sql)

以上sql放入到execute里直接就能跑了

多表联查(只支持内查询和左查询和全查询)

        sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
            join(BookDB,UserDB.id == BookDB.id).\
            join(alias(BookDB,"b"),text("b.id == b.id"),isouter=True).\
            join(alias(BookDB,"e"),text("e.id == e.id"),full=True). \
            where(text("id = :id and name = :name"))
        print(sql)

封装的工具

数据库配置文件database.properties

url=mysql://root:root@106.12.174.220:3306/demo?charset=utf8
echo=True # 是否打印sql语句
pool_size=10 # 连接池大小
pool_recycle=1800 # 连接池回收时间
pool_timeout=30 # 连接池超时时间
isolation_level=READ_COMMITTED # 事务隔离级别

工具

from sqlalchemy import create_engine, delete, update, text, alias
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base, sessionmaker

from src.file.FileReadAndWrite import FileReadAndWrite
from src.log.Log import Log


class SQLalchemyFast(object):
    Base = declarative_base()
    """
    功能: SQLalchemy工具
    """

    def __init__(self, dbFile):
        file = FileReadAndWrite.readPropertiesFile(dbFile)
        self.engine = create_engine(
            url=file['url'],
            echo=bool(file['echo']),
            pool_size=int(file['pool_size']),
            pool_recycle=int(file['pool_recycle']),
            pool_timeout=int(file['pool_timeout']),
            isolation_level=file['isolation_level'],
        )

        SQLalchemyFast.Base.metadata.create_all(self.engine)  # 创建表,如果表存在则不创建(必须对象继承Base)

    # 创建会话
    def createSession(self):
        Session = sessionmaker(bind=self.engine)
        return Session()

    # 添加一条数据
    def addData(self, object):
        with self.createSession() as conn:
            conn.add(object)
            conn.commit()

    # 添加多条数据
    def addDataList(self, objectList):
        with self.createSession() as conn:
            conn.add_all(objectList)
            conn.commit()

    # 删除主键id的数据
    def deleteDataById(self, cla, id):
        with self.createSession() as conn:
            conn.query(cla).filter(cla.id == id).delete()
            conn.commit()

    # 删除指定数据(where是并且的关系,不支持or和其他复杂查询)
    def deleteDataWhere(self, cla, *where):
        with self.createSession() as conn:
            stmt = delete(cla).where(*where)
            conn.execute(stmt)
            conn.commit()

    # 清空表
    def truncateTable(self, cla):
        with self.createSession() as conn:
            conn.query(cla).delete()
            conn.commit()

    # 更新指定主键id的数据
    def updateDataById(self, cla, id, data):
        """
        :param cla:  类(表)
        :param id:  主键id
        :param data:  {'key': "value",...}  key为表中的字段名,value为要修改的值
        :return:
        """
        with self.createSession() as conn:
            stmt = update(cla).where(cla.id == id).values(data)
            result = conn.execute(stmt)
            conn.commit()
            return result

    # 更新指定条件的数据 (where是并且的关系,不支持or和其他复杂查询)
    def updateDataWhere(self, cla, data, *where):
        """
        :param cla:  类(表)
        :param data:  {'key': "value",...}  key为表中的字段名,value为要修改的值
        :param where:  过滤条件
        :return:
        """
        with self.createSession() as conn:
            stmt = update(cla).where(*where).values(data)
            conn.execute(stmt)
            conn.commit()

    # 查询全部数据
    def queryDataAll(self, cla):
        with self.createSession() as conn:
            result = conn.query(cla).all()
        return result

    # 查询主键id的数据
    def queryDataById(self, cla, id):
        with self.createSession() as conn:
            result = conn.query(cla).filter(cla.id == id).first()
        return result

    # 查询指定数据,不支持分组查询(因为聚合后的数据无法转换成对象)
    def queryDataWhere(self, cla,aliasName=None, column=None, where=None,
                       join=None, on=None, left=None, full=None,
                       order="", limit="", offset="", distinct=None, params=None):
        with self.createSession() as conn:
            stmt = select(cla)
            if aliasName:
                stmt = select(alias(cla,aliasName))
            if column:
                stmt = stmt.with_only_columns(text(column)) .select_from(cla)
            if join is not None and on is not None:
                if left:
                    stmt = stmt.join(join, text(on), isouter=True)
                elif full:
                    stmt = stmt.join(join, text(on), full=True)
                else:
                    stmt = stmt.join(join, text(on))
            if where:
                stmt = stmt.where(text(where))
            if order:
                stmt = stmt.order_by(text(order))
            if limit:
                stmt = stmt.limit(limit)
            if offset:
                stmt = stmt.offset(offset)
            if distinct:
                stmt = stmt.distinct()
            result = conn.execute(stmt,params).all()
            result= [row[0] for row in result]
        return result

    # 创建事物(运行多条sql语句 ,function(conn)是一个函数,里面包含多条sql语句,需要使用原生的sqlalchemy)
    def createTransaction(self, function):
        with  self.createSession() as conn:
            conn.begin()
            try:
                function(conn)
                conn.commit()
            except Exception as e:
                Log.logError(e)
                conn.rollback()

    # 执行sql语句(包括增删改查,和存储过程...只要是sql语句都可以执行)
    def executeSql(self, sql, params=None):
        """
        :param sql:  sql语句  如: "select * from User where id = :id and name = :name "
        :param params:  参数  例如: {"id":1,"name":"张三"} 注意:参数名必须和sql语句中的参数名一致
        发送多个参数时,参数名必须以列表的形式传入,例如: {"id":["1","2"],"name":"张三"}
        "INSERT INTO some_table (x, y) VALUES (:x, :y)" 参数可以是 [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
        :return:
        """
        with self.createSession() as conn:
            result = conn.execute(text(sql), params)
            conn.commit()
        return result

    # 执行构建sql语句
    def executeSqlBuild(self, sql):
        with self.createSession() as conn:
            result = conn.execute(sql)
            conn.commit()
        return result

测试实体

from sqlalchemy import Column, Integer, String

from src.database.SQLalchemyFast import SQLalchemyFast


class UserDB(SQLalchemyFast.Base):
    __tablename__ = "User"   # __tablename__ 声明表名
    # primary_key为主键,autoincrement为自增, doc 为注释但是不会在数据库中生成
    id = Column(Integer, primary_key=True,autoincrement=True,doc="主键")
    name = Column(String(64), unique=True, doc="用户名") # unique=True 为唯一约束会在数据库中生成索引
    email = Column(String(64), doc="邮箱")
    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __str__(self):
           return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)
from sqlalchemy import Column, Integer, String

from src.database.SQLalchemyFast import SQLalchemyFast


class BookDB(SQLalchemyFast.Base):
    __tablename__ = "Book"   # __tablename__ 声明表名
    # primary_key为主键,autoincrement为自增, doc 为注释但是不会在数据库中生成
    id = Column(Integer, primary_key=True,autoincrement=True,doc="主键")
    name = Column(String(64), unique=True, doc="用户名") # unique=True 为唯一约束会在数据库中生成索引
    email = Column(String(64), doc="邮箱")
    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __str__(self):
           return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)

验证代码

import unittest
from sqlalchemy import delete, update, text, select, join, desc, asc, alias

from src.database.BookDB import BookDB
from src.database.SQLalchemyFast import SQLalchemyFast
from src.database.UserDB import UserDB
from src.file.FileTool import FileTool


class SQLalchemyFastTest(unittest.TestCase):
    # 测试添加数据
    def test_add(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.addData(UserDB("name1", "123456789"))
        db.addData(UserDB("name2", "123456789"))
        db.addData(UserDB("name3", "123456789"))
        db.addData(UserDB("name4", "123456789"))

    # 测试添加多条数据
    def test_addAll(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.addDataList([UserDB("name111", "123456789"), UserDB("name211", "123456789")])
    # 测试删除数据
    def test_deleteDataById(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.deleteDataById(UserDB, 1)

    # 测试条件删除数据
    def test_deleteWhere(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.deleteDataWhere(UserDB, UserDB.name == "name1", UserDB.email == "123456789")

    # 测试更新数据
    def test_update(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.updateDataById(UserDB, 10, {"name": "name31", "email": "123456789"})

    # 测试条件更新数据
    def test_updateFilter(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.updateDataWhere(UserDB, {"name": "name33", "email": "123456789"}, UserDB.name == "name2",
                           UserDB.email == "1231")

    # 测试查询数据
    def test_queryDataAll(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        data_all = db.queryDataAll(UserDB)
        for data in data_all:
            print(data)

    # 测试查询指定id数据
    def test_queryDataById(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        data = db.queryDataById(UserDB, 10)
        print(data)

    # 测试条件查询数据(不支持分组查询和链表查询)
    def test_queryDataWhere(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        data_all = db.queryDataWhere(UserDB,
                                     where="name like CONCAT(:name,'%')",
                                     order="id desc",
                                     offset=1,
                                     limit=3,
                                     params={"name": "name"})

        # db.queryDataWhere(UserDB,
        #                              where="name like CONCAT(:name,'%')",
        #                              order="id desc",
        #                              offset=1,
        #                              limit=3,
        #                              params={"name": "name"})

        # db.queryDataWhere(UserDB,aliasName="a",
        #                   join=alias(BookDB,"b"),on="a.id == b.id",
        #                   where="a.name like CONCAT(:name,'%')",
        #                   params={"name": "name"})
        for data in data_all:
            print(data)

    # 测试创建事物
    def test_createTransaction(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)

        def test1(conn):
            conn.add(UserDB("name111", "123456789"))
            conn.add(UserDB("name211", "123456789"))
            # raise Exception("test122")
            # conn.add(UserDB("name333", "123456789"))
            # conn.add(UserDB("name444", "123456789"))

        db.createTransaction(test1)

    # 测试执行sql(执行失败会回滚的(存储过程,函数))
    def test_executeSql(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        # data_all = db.executeSql("select * from User")
        # data_all = db.executeSql("select * from User where name like CONCAT(:name,'%')", params={"name":"name"})
        # for data in data_all:
        #     print(data)

        # 创建存储过程
        # db.executeSql("CREATE PROCEDURE `test_procedure` \
        # (IN `in_name` VARCHAR(255), IN `in_email` VARCHAR(255)) \
        # BEGIN \
        # INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
        # END")

        # 调用存储过程
        # db.executeSql("call test_procedure(:name, :email)", params={"name": "name555", "email": "email12131"})

        # 创建函数
        # db.executeSql("CREATE FUNCTION `test_function`( `in_name` VARCHAR(255),  `in_email` VARCHAR(255)) \
        #  RETURNS INT(11) \
        #  BEGIN \
        #  DELETE FROM `User`; \
        #  INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
        #  INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
        #  RETURN 1; \
        #  END")
        # 调用函数
        # data_all = db.executeSql("select test_function(:name, :email)", params={"name": "name5551", "email": "email12131"})



    # 测试sql构造
    def test_executeSqlBuild(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        # sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
        #     join(BookDB,UserDB.id == BookDB.id)
        # print(sql)
        # db.executeSqlBuild(sql)

加载全部内容

相关教程
猜你喜欢
用户评论