请教, websockets 模块起服务, websockets.serve 的方法问题。

2023-10-23 18:50:13 +08:00
 qazwsxkevin
import datetime
import random
import string
import asyncio
import time

import websockets

from multiprocessing import Manager
from concurrent import futures

# 忽略警告
import warnings
warnings.filterwarnings("ignore")

strLen = 30

def putmsg(que, gvar):
    cc = 0
    while True:
        cc += 1

        if gvar['flag'] == True:
            break

        ranStr = ''
        for s in range(strLen):
            ranStr = ranStr + random.choice(string.ascii_letters + string.digits)

        # slTime = random.uniform(0.01,0.2)

        logStr = str(cc) + ' ' + "{:.2f}".format(slTime) + ' ' + str(datetime.datetime.now().replace(microsecond=0)) + ' ' + ranStr
        # print(logStr)
		
        # test
        # print(cc, '#', que.qsize())

        que.put(logStr)

        # time.sleep(slTime)
        time.sleep(1.5)

def wsock(queu, gvar):
    loop = asyncio.get_event_loop()
    async def stoploop():
        loop.stop()

    # Maintain a list of connected clients
    connected_clients = set()

    async def register(websocket):
        # Add a new client to the list
        connected_clients.add(websocket)
        print('connected_clients.add(websocket)')

    async def unregister(websocket):
        # Remove a client from the list
        connected_clients.remove(websocket)
        print('connected_clients.remove(websocket)')

    async def broadcast(message):
        # Send a message to all connected clients
        if connected_clients:
            await asyncio.gather(*(client.send(message) for client in connected_clients))

    async def echo(websocket, que=queu):
        await register(websocket)

        while True:
            if queu.qsize():
                msgStr = queu.get()
                if connected_clients:
                    try:
                        await asyncio.gather(*(client.send(msgStr) for client in connected_clients))
                    except Exception as e:
                        print(e) # echo:received 1001 (going away); then sent 1001 (going away)
                        break
            else:
                asyncio.sleep(0.3)


        async for message in websocket:
            # Broadcast the received message to all clients
            if message == 'stop':
                gvar['flag'] = True
                await stoploop()
            await broadcast(message)

        await unregister(websocket)


    start_server = websockets.serve(echo, "172.17.0.2", 25299)

    asyncio.set_event_loop(loop)
    # loop.create_task(start_server)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()

if __name__ == '__main__':
    # 队列
    msgQue = Manager().Queue()
    # 全局变量
    glovar = Manager().dict()

    # 启停开关
    glovar['flag'] = False

    # 处理进程
    proc = futures.ProcessPoolExecutor(max_workers=2)

    wsockRet = proc.submit(wsock, msgQue, glovar)
    putmsgRet = proc.submit(putmsg, msgQue, glovar)

问题是 websocket.serve 使用 echo 方法作为 handle ,
只有在 websocket 接口有事件的时候,才会调用 echo 进行处理,(被动式)

echo 的被动方法,队列里的日志越来越多,
想有一个永久循环,如果有 client(s),send 取出的队列内容,没有 client ,取出就 pass 了,
websocket.serve 被动调用不适合这个场合,看官方也没有更好的提示,
请教大家这里怎么换个方式实现呢?

368 次点击
所在节点   WebSocket
2 条回复
julyclyde
2023-10-24 16:01:58 +08:00
看了一下这个库的说明,感觉好奇怪
为什么把“处理”和“IO”合在一个库里面啊
qazwsxkevin
2023-10-24 16:48:19 +08:00
@julyclyde #1 是啊,官网上异步和线程的范例逻辑思想,是做了很高的包装,要用在其它场景,很难适合,打算过几天有时间再去仔细地看看 python websocket 官网的 API renference ,看看能不能粒度化用在我的场景上。。。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/984626

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX