情况: 在 main()
from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# 建立全局变量字典
GOLVAR = Manager().dict()
# SQL 处理队列
SQLQueue = Manager().Queue()
# 处理 SQL 队列功能,一个单独进程在运行
ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
# 启动
ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd, SQLServerInfo, SQLQueue, GOLVAR)
def AA(someData,sqlqueue):
#略
XXX
sqlCommand = XXX
sqlqueue.put(sqlCommand)
retrun
def BB(someData,sqlqueue):
#略,和 AA 结构一样,最后往队列里 put(sqlCommand)
sqlqueue.put(sqlCommand)
retrun
def CC(someData,sqlqueue):
#略
sqlqueue.put(sqlCommand)
retrun
# 开动制造
while True:
# AA,BB,CC,DD 等处理函数按顺序,循环制造 SQL 语句,运行 AA,BB,CC,DD 等处理数据的函数处理上,其实几乎都不怎么占 CPU,I/O,最后向 Manager().Queue() put 入大量 SQL 语句
# 进去的 SQL 语句只有四种,
# INSERT INTO tblname (x) VALUE (x);
# INSERT INTO ... SELECT FROM XXX(最复杂也就嵌了 3 层);
# UPDATE SET...
# DELETE FROM...
# SQLQueue 量高的时候 1 秒进 4 万条,低的时候,200 秒不进 1 条
time.sleep(100)
# 处理 SQL 队列
def procSQLcmd(sqlinfo, sqlqueue, golvar):
import time
import datetime
from dbutils.pooled_db import PooledDB
import pymysql
from concurrent.futures import ThreadPoolExecutor
from MYFunc import SQLcmdData
from myFunc import colrRedB
from myFunc import TranDicttoSQLcmd
from warnings import filterwarnings
filterwarnings("error", category=pymysql.Warning)
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=600, # 连接池允许的最大连接数,0 和 None 表示不限制连接数
mincached=5, # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
maxcached=5, # 链接池中最多闲置的链接,0 和 None 不限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None 表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=1, # ping MySQL 服务端,检查是否服务可用。
host=sqlinfo['ip'],
port=sqlinfo['port'],
user=sqlinfo['user'],
password=sqlinfo['password'],
database=sqlinfo['database'],
charset=sqlinfo['charset']
)
DBconn = POOL.connection()
def exeCu(conn, sqltext):
try:
cur = conn.cursor()
cur.execute(sqltext)
# cur.commit()
cur.close()
except pymysql.Warning as e:
# print(f'#detial:{str(e)}\n',colrRedB(f"SQL ERR: {sqltext}"))
sqlsqlcmd = sqltext.replace("'","\\'").replace('"','\\"')
resonsql = str(e).replace("'","\\'").replace('"','\\"')
SQLErrorDict = {'sqlcmd': sqlsqlcmd,
'reson': resonsql,
'UpdateTime': datetime.datetime.now().replace(microsecond=0)}
SQLCmd = TranDicttoSQLcmd('MYSQLERRLog', SQLErrorDict, None)
SQLcmdData(sqlinfo, SQLCmd)
return
while True:
if sqlqueue.qsize() == 0:
# 开关
if golvar['stopsqlflag'] == False:
time.sleep(2)
break
# SQL 语句执行,必须按队列 FIFO 顺序写入
while not sqlqueue.empty():
with ThreadPoolExecutor(1) as executor:
executor.submit(exeCu, DBconn, sqlqueue.get())
DBconn.close()
return
请教问题:
1 、这样的设计,写入每秒是 800 ~ 2500 条左右,虽然能做到对 MySQL 服务器写入浪涌的削峰填谷,但 SQLQueue 在峰值的时候,很容易一下就超了 17 万,太多的未写入,也影响了 main()的大循环
2 、从 MySQL 的服务器的性能判断来看,
SHOW STATUS WHERE (Variable_name like '%thre%' OR Variable_name like '%conn%' OR Variable_name like '%cache%');
SHOW PROCESSLIST;
MySQL 服务器其实跟睡着了没区别,瞬时链接数 3,4 个,没有感受到什么事情(是对 PooledDB 的用法有问题?)
3 、以前以为是服务器 I/O 的问题,换 8 核 16 线程 CPU 的机器,换上 SSD,内存 64GB,my.cnf 的 cache 调到 65%,都没有太大改善
4 、请教如何调整做法,从 SQLQueue 取出 SQL 语句怼服务器,可以拉满拉爆?
5 、小范围,小应用,上大工业架构的方式就算了,折腾不起。。。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.