import datetime
import random
import string
import asyncio
import time
import websockets
from multiprocessing import Manager
from concurrent import futures
# 忽略警告
import warnings
warnings.filterwarnings("ignore")
strLen = 30
def putmsg(que, gvar):
cc = 0
while True:
cc += 1
if gvar['flag'] == True:
break
ranStr = ''
for s in range(strLen):
ranStr = ranStr + random.choice(string.ascii_letters + string.digits)
# slTime = random.uniform(0.01,0.2)
logStr = str(cc) + ' ' + "{:.2f}".format(slTime) + ' ' + str(datetime.datetime.now().replace(microsecond=0)) + ' ' + ranStr
# print(logStr)
# test
# print(cc, '#', que.qsize())
que.put(logStr)
# time.sleep(slTime)
time.sleep(1.5)
def wsock(queu, gvar):
loop = asyncio.get_event_loop()
async def stoploop():
loop.stop()
# Maintain a list of connected clients
connected_clients = set()
async def register(websocket):
# Add a new client to the list
connected_clients.add(websocket)
print('connected_clients.add(websocket)')
async def unregister(websocket):
# Remove a client from the list
connected_clients.remove(websocket)
print('connected_clients.remove(websocket)')
async def broadcast(message):
# Send a message to all connected clients
if connected_clients:
await asyncio.gather(*(client.send(message) for client in connected_clients))
async def echo(websocket, que=queu):
await register(websocket)
while True:
if queu.qsize():
msgStr = queu.get()
if connected_clients:
try:
await asyncio.gather(*(client.send(msgStr) for client in connected_clients))
except Exception as e:
print(e) # echo:received 1001 (going away); then sent 1001 (going away)
break
else:
asyncio.sleep(0.3)
async for message in websocket:
# Broadcast the received message to all clients
if message == 'stop':
gvar['flag'] = True
await stoploop()
await broadcast(message)
await unregister(websocket)
start_server = websockets.serve(echo, "172.17.0.2", 25299)
asyncio.set_event_loop(loop)
# loop.create_task(start_server)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
if __name__ == '__main__':
# 队列
msgQue = Manager().Queue()
# 全局变量
glovar = Manager().dict()
# 启停开关
glovar['flag'] = False
# 处理进程
proc = futures.ProcessPoolExecutor(max_workers=2)
wsockRet = proc.submit(wsock, msgQue, glovar)
putmsgRet = proc.submit(putmsg, msgQue, glovar)
问题是 websocket.serve 使用 echo 方法作为 handle ,
只有在 websocket 接口有事件的时候,才会调用 echo 进行处理,(被动式)
echo 的被动方法,队列里的日志越来越多,
想有一个永久循环,如果有 client(s),send 取出的队列内容,没有 client ,取出就 pass 了,
websocket.serve 被动调用不适合这个场合,看官方也没有更好的提示,
请教大家这里怎么换个方式实现呢?
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.