请教这个 concurrent.futures 多进程处理 SQL 队列,为什么只处理了第一个,就停了下来?

2020-11-01 16:33:34 +08:00
 qazwsxkevin
# coding=utf-8
import time
import pymysql
import MySQLdb
import AnalyFunc 

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
from dbutils.pooled_db import PooledDB

GloSQLQueueBreakFlag = 1 # 处理队列退出判断信号

# 处理 SQL 队列
def procSQLcmd(sqlqueue):
    import time
    from dbutils.pooled_db import PooledDB
    import pymysql

    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=80,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=0,  # ping MySQL 服务端,检查是否服务可用。
        host='192.168.89.48',
        port=3306,
        user='root',
        password='root123',
        database='eee',
        charset='utf8'
    )

    while True:
        while not sqlqueue.empty():
            print(GloSQLQueueBreakFlag:', str(GloSQLQueueBreakFlag))
            sqlTask = sqlqueue.get()
            DBconn = POOL.connection()
            cur = DBconn.cursor()
            print("sqlTask:",sqlTask)
            ses = cur.execute(sqlTask)
            cur.close()  # or del cur
            DBconn.close()  # or del db
            time.sleep(0.5)
        if GloSQLQueueBreakFlag == 0:
            break
        else:
            time.sleep(1)
    return

if __name__ == '__main__':
    from concurrent import futures
    from multiprocessing import Manager
    from teFunc import TranDicttoSQLcmd

    SQLQueue = Manager().Queue()
    ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
    ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd,SQLQueue)

    # 导入测试数据,成为字典列表
    teList = eval(AnalyFunc.ReadFiletoStr('h:/testdict.dict'))

    for i in teList:
        print(i)
        TranDicttoSQLcmd('testSheet', i, SQLQueue)  # 把字典转换成 MySQL 的 INSERT 语句,同时把语句作为任务交到全局队列,交由独立进程的 procSQLcmd 函数去处理


    # 最后确保队列全部弄完,才完全退出整个程序
    waitSQLQueue = True
    while waitSQLQueue == True:
        time.sleep(0.5)
        SQLQueueCount = SQLQueue.qsize()
        print(f'SQLQueue 队列还有:{SQLQueueCount} 未处理完.')
        if SQLQueueCount == 0:
            GloSQLQueueBreakFlag = 0
            waitSQLQueue = False
# 将字典转换成 SQL 语句
def TranDicttoSQLcmd(tblName,DictObj,SQLQueue,printSQL=False):
    import time
    # 组合字段
    FiledStr = ''
    ValueStr = ''
    SQLText = ''

    # 生成 INSERT 语句
    SQLcmd = "INSERT INTO %s ({}) VALUE ({});" % tblName

    # 单一字典
    if isinstance(DictObj, dict):
        FiledStr = ''
        ValueStr = ''
        for k, v in DictObj.items():
            if v == None:
                continue
            FiledStr = FiledStr + "`%s`" % (k) + ','
            ValueStr = ValueStr + "'%s'" % (str(v)) + ','
        FiledStr = FiledStr[:-1]
        ValueStr = ValueStr[:-1]
        SQLText = SQLcmd.format(FiledStr, ValueStr)
        if printSQL:
            print('TranDicttoSQLcmd:',SQLText)
        if SQLQueue:
            SQLQueue.put(SQLText)
        return SQLText

    # 字典列表
    if isinstance(DictObj, list):
        kvDict = {}
        ccount = 0
        for i in DictObj:
            FiledStr = ''
            ValueStr = ''
            ccount += 1
            for k,v in i.items():
                if v == None:
                    continue
                FiledStr = FiledStr + "`%s`" % (k) + ','
                ValueStr = ValueStr + "'%s'" % (str(v)) + ','
            FiledStr = FiledStr[:-1]
            ValueStr = ValueStr[:-1]
            SQLText = SQLcmd.format(FiledStr, ValueStr)
            if printSQL:
                print('TranDicttoSQLcmd:',SQLText)
            if SQLQueue:
                SQLQueue.put(SQLText)
    return

运行过程:

SQLQueue 队列还有:538
GloSQLQueueBreakFlag: 1
sqlTask: INSERT INTO testSheet (`tename`,`amount`,`weight`) VALUE ('椰子','218','72170');
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
#一直刷下去重复

检查 testSheet 表,一个椰子内容被插入,处理 SQL 语句队列的进程函数工作不正常是什么原因呢?

2036 次点击
所在节点    Python
1 条回复
abucus
2020-11-06 02:51:44 +08:00
尝试把 `procSQLcmd` 方法里的 `break` 语句注释掉看看?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/720725

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX