首页 > 数据库技术 > 详细

pymysql操作数据库【进阶版】

时间:2021-05-17 21:44:31      阅读:13      评论:0      收藏:0      [点我收藏+]

配置文件,名称为config.py

#config.py
rds_v1 = {
    "host": "127.0.0.1",
    "user": "root",
    "password": "12345678",
    "database": "users",
    "port": 3306
}

类对象内容

#database.py
import pymysql
import pandas as pd
from config import *



class DB:

    def __init__(self, engine):
        self._engine = engine
        self._conn = pymysql.connect(host=self._engine[‘host‘],
                                     user=self._engine[‘user‘],
                                     password=self._engine[‘password‘],
                                     port=self._engine[‘port‘],
                                     database=self._engine[‘database‘] ,
                                     charset=‘utf8‘)

    def insert(self, dataframe, table):
        """插入dataframe类型数据到指定表"""
        with self._conn.cursor() as cursor:
            col = ‘,‘.join(dataframe.columns)
            par = ‘,‘.join([‘%s‘] * dataframe.shape[1])
            keys = ‘,‘.join([f‘{i} = values({i})‘ for i in dataframe.columns])
            sql = f‘insert into {table} ({col}) values({par}) on duplicate key update {keys};‘
            cursor.executemany(sql, [tuple(i) for i in df.values])
            try:
                self._conn.commit()
                msg = f"""数据写入成功:共{dataframe.shape[0]}条数据写入{table}表!!!"""
            except Exception as err:
                self._conn.rollback()
                msg = f"数据写入失败!!!"
                raise Exception(msg)
        return msg

    def select(self, sql, kind=True):
        """查询数据:kind为True表示需要输出数据"""
        with self._conn.cursor() as cursor:
            try:
                cursor.execute(sql)
                if kind:
                    result = cursor.fetchall()
                    cols = cursor.description
                    return pd.DataFrame(result, columns=[i[0] for i in cols])
                else:
                    return self._conn.commit()
            except Exception as err:
                self._conn.rollback()
                msg = f"ERROR - {self._engine[‘host‘]} session init failed: {err}" + "\n"
                raise Exception(msg)

    def tables(self):
        """查看表"""
        sql = "show tables;"
        return self.select(sql)

    def drop(self, table):
        """删除表"""
        sql = f"drop table {table};"
        return self.select(sql, kind=False)

    def delete(self, table, tag=‘1=1‘):
        """删除数据,可以按条件删除"""
        sql = f"delete from {table} where {tag};"
        return self.select(sql, kind=False)

    def desc(self, table):
        """查看表结构"""
        sql = f"desc {table};"
        return self.select(sql)

    def process(self):
        """查看数据库进程"""
        sql = r"show processlist;"
        return self.select(sql)

    def kill(self, process_id):
        """关闭数据库进程"""
        sql = f"kill {process_id};"
        return self.select(sql, kind=False)

    def use(self, database):
        """切换数据库"""
        sql = f"use {database};"
        return self.select(sql, kind=False)

    def close(self):
        self._conn.close()

    def create_sql(self, table):
        sql = f"""show create table {table}"""
        dataframe = self.select(sql)
        return print(dataframe[‘Create Table‘][0])


if __name__ == ‘__main__‘:
    db = DB(rds_v1)
    df = db.tables()

pymysql操作数据库【进阶版】

原文:https://www.cnblogs.com/blog-for-me/p/14778334.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!