V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
Tdy95
V2EX  ›  程序员

[求助] Python 调用 dll 的时候发现代码运行被阻塞了?

  •  
  •   Tdy95 ·
    mydaoyuan · 2024-08-26 09:36:41 +08:00 · 3077 次点击
    这是一个创建于 375 天前的主题,其中的信息可能已经有所发展或是发生改变。

    业务说明

    使用 ctypes 对接 C++的 dll 接口,封装为 websocket 接口方便网络调用。使用了 Redis 的 PUB/SUB 来监听服务端主动发起的消息通知。

    使用了 asyncio 库做异步任务来处理 websocket 的请求通知,使用 aioredis 来进行管理 Redis 连接。

    案例代码

    入口代码如下:

    async def main():
        redis = aioredis.from_url(f'redis://:{redis_password}@{redis_host}:{redis_port}', encoding='utf-8')
        redis_task = asyncio.create_task(redis_subscriber_start_websocket(redis))
        server_task = websockets.serve(msdk_handler, "0.0.0.0", 8765)
        asyncio.create_task(monitor_loop())
        print("WebSocket 服务器启动在 ws://localhost:8765:redis 地址: ", redis_host, redis_port)
        await asyncio.gather(server_task, redis_task)
        
    
    if __name__ == "__main__":
        asyncio.run(main())
    
    

    大致的 websocket 处理流程如下:

           try:
                async for message in websocket:
                    data = json.loads(message)
                    # print(f"收到 websocket 消息: {data}")
                    await messageHandler(data, connected[random_id_str])
                print("WebSocket 连接正常关闭")  # 添加这行来确认循环是否正常结束
                await clearnWebscoket(random_id_str)
                
         
       async def messageHandler(data, connected):
          if data['action'] == 'something':
          # 切换位置
          results = await change_something(connected['client_id'], data)
          await connected['websocket'].send(json.dumps(results))
    
    	async def change_something(client_id):
    	    future = asyncio.Future()
    	    futures_change_chara_pos[client_id] = future
    	    # 调用 DLL 中的更改角色位置函数
    	    sdk.SDK_change_something(c_char_p(client_id.encode('utf-8')),callback_instance)    
    	    while not future.done():
    	        await asyncio.sleep(0)
    	    result = await future
    	    return result
    	    
    在 callback_instance 回调中,我会设置 future 的内容。
    
    def set_futures_status(client_id, futures_obj, data):
        if client_id in futures_obj:
            future = futures_obj.pop(client_id)
            if not future.done():
                future.set_result(data)
    

    问题

    对项目的异步和 Redis 消息有点不理解。

    1 、有一个播放音频的 C++接口,会频繁触发回调函数,我发现在播放音频的时候,接收了其他的 websocket 消息,发现好像没有处理,处理函数的日志没有打印出来 [偶现正常] 。不知道是不是我异步有问题,阻塞了主线程导致的。(作为一个前端开发,python 的异步好难懂,大佬们有推荐的书籍吗)

    2 、Redis 消息一会后,就不会接收到订阅的消息。 消息的发布和接收频道一致,并且重启 python 后服务正常。

    期望

    能帮忙解决一下目前碰到的 2 个问题,特别是异步的代码,我不知道自己写的是否正确。

    	    while not future.done():
    	        await asyncio.sleep(0)
    	    result = await future
    

    上面的代码就是我发现 await future 后会一直阻塞主进程代码运行。所以在 AI 的提示下,加上了代码后正常运行。但是 websocket 消息处理中的 await 又不会阻塞整个主进程运行,就很疑惑。

    报酬

    目前预算 500 ,可谈。代码地址: https://github.com/mydaoyuan/pythonAndDll

    联系方式:d29zaGl0ZHkxMjM0NTY=

    18 条回复    2024-08-29 11:57:55 +08:00
    zombiecong
        1
    zombiecong  
       2024-08-26 09:50:09 +08:00   ❤️ 1
    python 的异步并不是多线程和多进程,需要并发还要用 ThreadPoolExecutor 或者 ProcessPoolExecutor
    https://docs.python.org/3/library/concurrent.futures.html
    mightybruce
        2
    mightybruce  
       2024-08-26 10:40:46 +08:00   ❤️ 1
    调用 C++的 dll 接口 是会阻塞的,
    asyncio 是协程,线程阻塞了, 协程肯定是会阻塞的,

    sdk 相关的代码单独测试吧,可以搞一个并发多进程队列, 发数据扔给队列就继续处理接受数据,sdk 相关的代码不断从队列里面取数据。
    pursuer
        3
    pursuer  
       2024-08-26 10:44:11 +08:00   ❤️ 1
    如果在使用 asyncio 的过程中使用了多线程回调时需要注意,asyncio 中很多 API 并不是线程安全的,不能跨线程调用

    比如 set_futures_status 这个函数就应该通过如下模式调用

    asyncio.run_coroutine_threadsafe(set_futures_status(a1,a2,a3),eventloop)
    pursuer
        4
    pursuer  
       2024-08-26 10:49:45 +08:00   ❤️ 1
    顺便一提,此时 set_futures_status 应该改成 async def

    eventloop 是你创建 future 的线程的 eventloop ,可以通过 get_event_loop()获取
    这样可以解决无法用 future 唤醒导致的阻塞的问题。

    while not future.done():
    --await asyncio.sleep(0)
    result = await future
    这种写法就是个坑,相当于轮循,完全没发挥协程的优势,不建议使用这种写法
    zagfai
        5
    zagfai  
       2024-08-26 11:02:34 +08:00   ❤️ 1
    @pursuer await sleep 并没有问题,在有其他协程需要用 cpu 的时候主动让出并不影响多少性能
    fds
        6
    fds  
       2024-08-26 11:05:56 +08:00   ❤️ 1
    python 感觉还是写阻塞的代码比较流畅,异步是后面塞进来的,得对底层多一些了解。阻塞的逻辑得单独放在个线程里处理。要异步不能直接用 nodejs 吗?虽然 nodejs 写不好也可能阻塞,但毕竟设计之初就是异步模式,大部分常用 IO 也都包装好了。前端上手 js 也熟练些。
    pursuer
        7
    pursuer  
       2024-08-26 11:09:47 +08:00   ❤️ 1
    @zagfai 对应用本身影响不大,但是会额外损耗其他进程的 CPU 资源,能用回调唤醒自然优先回调唤醒
    Tdy95
        8
    Tdy95  
    OP
       2024-08-26 12:38:52 +08:00
    @fds node 对接 dll 和 so 的能力还是太弱了,python 这块方便很多。Python 之前么咋用过,全靠 AI 磕磕碰碰写出来的代码。结果被异步搞自闭了 T T
    fds
        9
    fds  
       2024-08-26 13:55:13 +08:00
    @Tdy95 哦 这样呀。其实就像前面一些回复说的把 dll 调用扔到个线程就行。我也不太熟悉,问了下 gpt ,给出的代码是

    import asyncio
    from concurrent.futures import ThreadPoolExecutor

    executor = ThreadPoolExecutor(max_workers=4)

    def dll_call(client_id):
    # 同步调用 DLL 函数
    sdk.SDK_change_something(c_char_p(client_id.encode('utf-8')), callback_instance)
    # 假设这里返回结果
    return "result from dll"

    async def change_something(client_id):
    loop = asyncio.get_running_loop()
    # 在后台线程中执行 DLL 调用,避免阻塞事件循环
    result = await loop.run_in_executor(executor, dll_call, client_id)
    return result

    确实就跟前面几楼的回复一样呢,也算是挺清晰的。
    sujin190
        10
    sujin190  
       2024-08-26 14:12:07 +08:00
    这还不简单,asyncio 的运行线程和 dll 运行线程不能是同一个啊,否则一个线程同一个时刻只能干一件事情,asyncio await 检测到有 io 阻塞会切换 python 栈帧,但是如果你是重 cpu 一直在计算 await 也会阻塞的
    LinePro
        11
    LinePro  
       2024-08-26 16:02:01 +08:00
    问一下,印象中 Future 是可以直接 await 等待结果的。那 while not future.done() 这种轮询式的写法意义何在?
    Tdy95
        12
    Tdy95  
    OP
       2024-08-26 16:23:38 +08:00
    @LinePro 我是发现在调用 dll 后,5s 后返回结果。 但是再等 5s 后才会执行 await 后的代码。我是让 AI 给出的 while not future.done() 代码
    Tdy95
        13
    Tdy95  
    OP
       2024-08-26 16:24:16 +08:00
    @fds 谢谢老哥,我去试试看
    Tdy95
        14
    Tdy95  
    OP
       2024-08-29 09:34:58 +08:00
    @pursuer 我已经修改了,发现 future 一直没变化。
    ```
    def callback_speak_by_audio(code, status, frame_id, client_id):
    client_id = client_id.decode('utf-8') # 解码客户端 ID
    status = json.loads(status.decode('utf-8')) # 假设 status 是 UTF-8 编码的字符串
    print(f"callback_speak_by_audio=====run, {client_id in futures_speak_by_audio_stream}")
    if status["data"]["FrameId"] == -1:
    if code == MSDKStatus.MSDK_SUCCESS_SPEAK_BY_AUDIO_FINISH.value:
    print(f"语音说话: {code}, 客户端 ID: {client_id}")
    asyncio.run_coroutine_threadsafe(set_futures_status_async(client_id, futures_speak_by_audio_stream, {"code": 204, "status": status, "name": "speak_by_audio", "success": True, "client_id": client_id}), websocketAll[client_id]['main_loop'])


    async def set_futures_status_async(client_id, futures_obj, data):
    if client_id in futures_obj:
    future = futures_obj.pop(client_id)
    if not future.done():
    future.set_result(data)


    ```

    外部调用形式为:
    ```
    if connected["audio_future"] is None:
    feature = asyncio.Future()
    connected["audio_future"] = feature
    print("Creating sendAudioEndData task")
    asyncio.create_task(sendAudioEndData(connected, feature))

    async def sendAudioEndData(connected, feature):
    print("发送音频结束数据")
    try:
    result = await feature # 等待 feature 完成
    print(f"发送音频结束数据: {result}")
    await connected['websocket'].send(json.dumps(result))
    except Exception as e:
    print(f"发送数据时发生错误: {e}")
    finally:
    connected["audio_future"] = None # 确保音频 future 被正确重置

    ```

    麻烦大佬再指点一下,是哪里不对呢?从日志结果看,“语音说话:”的日志已经执行。 但是后续的 sendAudioEndData 的 print(f"发送音频结束数据: {result}")没有执行。
    Tdy95
        15
    Tdy95  
    OP
       2024-08-29 09:45:39 +08:00
    @pursuer 代码乱码了。我贴个图
    [Imgur]( https://imgur.com/Y46UeJ4)


    [Imgur]( https://imgur.com/2sNE2LK)
    pursuer
        16
    pursuer  
       2024-08-29 10:23:37 +08:00
    @Tdy95 不知道你的 websocketAll[client_id]['main_loop']是哪里来的,future 正常唤醒是要求你的 future 创建和 set_result 在同一个线程/event_loop 中执行,要看着两个地方。
    Tdy95
        17
    Tdy95  
    OP
       2024-08-29 10:43:26 +08:00
    @pursuer 我是直接在 message_handle.py 文件创建了一个全局变量。main_loop = asyncio.get_event_loop()
    pursuer
        18
    pursuer  
       2024-08-29 11:57:55 +08:00
    @Tdy95 我看不到你怎么赋值 websocketAll[client_id]['main_loop']。也没法判断 futures_obj[client_id] 也不知道是不是和 connected["audio_future"]是同一个对象。或者在我上面说的两个地方打印下 get_running_loop() 和 future 看看是不是一致的
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   905 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 19:56 · PVG 03:56 · LAX 12:56 · JFK 15:56
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.