请教,从 FIFO 队列中取出 SQL 语句,写入到 MySQL,如何可以拉满拉爆性能?

2021-08-28 19:17:06 +08:00
 uti6770werty

情况: 在 main()

    from multiprocessing import Manager
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    
    # 建立全局变量字典
    GOLVAR = Manager().dict()
    # SQL 处理队列
    SQLQueue = Manager().Queue()
     
    # 处理 SQL 队列功能,一个单独进程在运行
    ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
    # 启动
    ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd, SQLServerInfo, SQLQueue, GOLVAR)
     
    def AA(someData,sqlqueue):
     	#略
     	XXX
     	sqlCommand = XXX
     	sqlqueue.put(sqlCommand)
     	retrun
    
    def BB(someData,sqlqueue):
     	#略,和 AA 结构一样,最后往队列里 put(sqlCommand)
     	sqlqueue.put(sqlCommand)
     	retrun
     		
    def CC(someData,sqlqueue):
     	#略
     	sqlqueue.put(sqlCommand)
     	retrun    
     		
   # 开动制造
    while True:
  	# AA,BB,CC,DD 等处理函数按顺序,循环制造 SQL 语句,运行 AA,BB,CC,DD 等处理数据的函数处理上,其实几乎都不怎么占 CPU,I/O,最后向 Manager().Queue() put 入大量 SQL 语句
     		
     	# 进去的 SQL 语句只有四种,
     	# INSERT INTO tblname (x) VALUE (x);
     	# INSERT INTO ... SELECT FROM XXX(最复杂也就嵌了 3 层);
     	# UPDATE SET...
     	# DELETE FROM...
     		
     	# SQLQueue 量高的时候 1 秒进 4 万条,低的时候,200 秒不进 1 条
       	time.sleep(100)

# 处理 SQL 队列
def procSQLcmd(sqlinfo, sqlqueue, golvar):
    import time
    import datetime
    from dbutils.pooled_db import PooledDB
    import pymysql
    from concurrent.futures import ThreadPoolExecutor
    from MYFunc import SQLcmdData
    from myFunc import colrRedB
    from myFunc import TranDicttoSQLcmd

    from warnings import filterwarnings
    filterwarnings("error", category=pymysql.Warning)

    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=600,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
        mincached=5,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=1,  # ping MySQL 服务端,检查是否服务可用。
        host=sqlinfo['ip'],
        port=sqlinfo['port'],
        user=sqlinfo['user'],
        password=sqlinfo['password'],
        database=sqlinfo['database'],
        charset=sqlinfo['charset']
    )

    DBconn = POOL.connection()
    
    def exeCu(conn, sqltext):
        try:
            cur = conn.cursor()
            cur.execute(sqltext)
            # cur.commit()
            cur.close()
        except pymysql.Warning as e:
            # print(f'#detial:{str(e)}\n',colrRedB(f"SQL ERR: {sqltext}"))
            sqlsqlcmd = sqltext.replace("'","\\'").replace('"','\\"')
            resonsql = str(e).replace("'","\\'").replace('"','\\"')
            SQLErrorDict = {'sqlcmd': sqlsqlcmd,
                            'reson': resonsql,
                            'UpdateTime': datetime.datetime.now().replace(microsecond=0)}
            SQLCmd = TranDicttoSQLcmd('MYSQLERRLog', SQLErrorDict, None)
            SQLcmdData(sqlinfo, SQLCmd)
        return

    while True:
        if sqlqueue.qsize() == 0:
            # 开关
            if golvar['stopsqlflag'] == False:
                time.sleep(2)
                break

        # SQL 语句执行,必须按队列 FIFO 顺序写入
        while not sqlqueue.empty():
            with ThreadPoolExecutor(1) as executor:
                executor.submit(exeCu, DBconn, sqlqueue.get())

    DBconn.close()
    return		
请教问题:
1 、这样的设计,写入每秒是 800 ~ 2500 条左右,虽然能做到对 MySQL 服务器写入浪涌的削峰填谷,但 SQLQueue 在峰值的时候,很容易一下就超了 17 万,太多的未写入,也影响了 main()的大循环
2 、从 MySQL 的服务器的性能判断来看,
SHOW STATUS WHERE (Variable_name like '%thre%' OR Variable_name like '%conn%' OR Variable_name like '%cache%');
SHOW PROCESSLIST; 
 MySQL 服务器其实跟睡着了没区别,瞬时链接数 3,4 个,没有感受到什么事情(是对 PooledDB 的用法有问题?)
3 、以前以为是服务器 I/O 的问题,换 8 核 16 线程 CPU 的机器,换上 SSD,内存 64GB,my.cnf 的 cache 调到 65%,都没有太大改善
4 、请教如何调整做法,从 SQLQueue 取出 SQL 语句怼服务器,可以拉满拉爆?
5 、小范围,小应用,上大工业架构的方式就算了,折腾不起。。。
2397 次点击
所在节点    Python
10 条回复
BBCCBB
2021-08-28 20:07:18 +08:00
看你这个貌似没用到批量写入? 可以尝试批量
heyjei
2021-08-28 20:37:14 +08:00
代码没细看,但思路其实很简单,攒一波数据,到 1 千条或者 1 千条没到但 1 秒钟到了,再批量输入。如果批量写入的方案还是不满足,可以把数据写入到文件里,然后再定时调用 load data infile,load data infile 的写入速度可以达到磁盘的最大 IO 速度(前提是使用 MyISAM,并且没有索引)
heyjei
2021-08-28 20:45:00 +08:00
还有一种改动最小的一种方式:

我们的 SQL 语句是 insert into table_name (column1, column2) values (value1, value2)

在下面的语句中,你不要把整个语句 put 进去,把 (value1, value2) put 进去
sqlqueue.put(sqlCommand)

在下面的语句,get 之后,不要立即执行,攒够 1000 个数据,或者 1 秒超时,然后拼接 SQL 成完整的语句并执行。
# SQL 语句执行,必须按队列 FIFO 顺序写入
while not sqlqueue.empty():
with ThreadPoolExecutor(1) as executor:
executor.submit(exeCu, DBconn, sqlqueue.get())
uti6770werty
2021-08-28 21:10:00 +08:00
@BBCCBB

@heyjei

队列里,不全是 INSERT INTO 。。,也许还有偶然一两个 ALERT 也不一定,要按 FIFO 顺序,所以就不好套批量模板了。。。

by the way,有试过 sqlcmd + ";" + sqlcmd,这样操作过,但似乎 PooledDB.conn.cursor().excute 不支持这种多语句组装命令执行? 前几天有试过,当时没成功,没研究下去,后面去研究如何高并发去了,结果更迷糊,就这个场合用,现有的高并发非常折腾。。。
liprais
2021-08-28 21:19:46 +08:00
你的 mysql 服务器有几个核心?
600 个 connection 太多了
uti6770werty
2021-08-28 21:37:09 +08:00
上面忘了说一个事情,就是就算是峰值 17W 条数据里也好,平时 5,6 千条也好,很多时候都队列的数据,是表里已经有的了,表的索引机制已经避免了重复插入数据,所以存表里的数据量其实不多的。。。

@liprais 8 核,16 线程,CentOS 6 + MySQL 5.5,按月份分表,最多的表数据不过 800 万
uti6770werty
2021-08-28 21:40:36 +08:00
@liprais PooledDB 的 connection 很奇怪的,它这里 600 只是最高允许 600 而已,我现在只是一台数据处理电脑向 MySQL 写数据而已,SHOW PROCESSLIST 看,也就 7,8 条连接
noparking188
2021-08-29 09:01:51 +08:00
所以这段代码真是生产上用的嘛?
看这段代码是用线程做并发,线程开销比较大,这种 IO 密集任务不大行,可以换协成程测下效果,更轻量级,几年前写过类似脚本用的 gevent,ayncio 我没研究过,楼主也可以看看,个人感觉主要是想办法提高并发处理能力吧,和 MySQL 没啥关系
说的不对还望指正
noparking188
2021-08-29 09:15:39 +08:00
个人感觉比较简单的做法就是换 redis 做队列,生成 SQL 单独一个程序跑,消费队列数据发 SQL 请求的一个程序,要提高并发起简单地多个进程就行了,多进程+多线程(协程)的方式,用 supervisor 托管更好
当然这样的方案是建立在示例代码用线程并发造成网络 IO 请求瓶颈的猜测上
todd7zhang
2021-08-30 10:16:35 +08:00
没动啊,既然严格要求 FIFO, 为啥去执行 SQL 的时候,还要开 ThreadPoolExecutor ?这种情况,从 sqlqueue 拿出来虽然是顺序的,但是感觉执行过去就可能乱序啊,毕竟 while 里面在不停的开 executor 。

我感觉都不用 POOL, 就一个 conn 不停的执行 sql 就好了?
with conn.cursor() as cr:
while not sqlqueue.empty():
cr.execute(sqlqueue.get())

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/798537

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX