求助!关于流程引擎框架和执行自定义用户代码

223 天前
 xhatt510

公司有个项目,主要需求就是 能够执行用户上传的插件(就是 pyton2 代码),并且按照一定的流程执行,然后要支持集群模式。

先说问题:

  1. 写插件的人调试困难,基本上属于黑盒了
  2. 改插件代码之后再重新执行,特别的麻烦
  3. 我还需要写检查代码语法的函数(虽然利用 ast 模块已经实现了),比如:必须定义 main 函数,并且仅有一个入参。但是使用中还是有其他的一些问题,比如一些我没考虑到的情况出现。就会导致后面执行的时候报错或者根本无法载入进去。
  4. 插件可能会出现内存泄露的情况,导致服务区器存吃满。别人每次都会直接找我们,虽然是因为插件导致的。可能需要能自定监测插件是否出现内存泄露的办法。

具体是:

有个 web 界面上传插件,然后在界面上面配置几个插件执行。这个任务会按照设置好的周期执行。

插件的流程是具体的 为:查询数据--处理数据--发送数据

查询到的数据每次大概有 1000 条,占用 10 兆左右的内存,每天大概有 7000 万条数据
现在我的做法:
  1. 用户上传插件之后,各个机器收到 kafka 消息,然后用importlib将插件动态导入进内存中 以便后面调用
  2. 单独有个进程去执行查询数据的脚本
  3. 将查询到的数据通过 kafka 发送至处理数据的进程
  4. 处理数据的进程收到 kafka 消息 根据消息内要执行的处理数据插件发送数据插件 来分别按照流程调用插件
  5. 最后需要将最后一步的数据写进 ES ,并且通过 kafka 进行通知回调
上面为什么要用 importlib 载入到内存中,是因为一开始尝试用命令行直接 python xxx.py 执行插件。但是这个调用行为属于高频操作,每天大概要调用几百万次。每次重新开启 python 虚拟机,速度根本就跟不上。所以后来改成直接载入到内存,通过函数调用。

求助大佬们,有没有开源的框架能够胜任这种情况的?已经尽可能精简了,希望大佬们留下一点点意见。

1462 次点击
所在节点    程序员
11 条回复
musi
223 天前
打开一下思路
引入 faas ,想稳定就云厂商,想自建就找开源的
然后就变成了两个服务之间的交互 rpc/http 看喜好
xhatt510
223 天前
@musi 多谢大佬留言,我先去看看 faas 。
然后我们这个要给客户线下物理机部署。不能上云
Dongxiaohao
223 天前
没写过你这种流程引擎,但是有类似的户上传 Python 算法,给服务端去执行;
Python 那边起了一个 Flask 的 webserver ,提供 deploy 和 uninstall 和调用算法 predict 方法的接口。
大致流程就是 web 服务器部署算法之后,Java 把算法文件传给 Flask ,Flask 动态加载这个 Python 算法,存在内存中。
然后等待 Java 调用 predict 的请求就行了。
要和算法编写人员约定好算法文件的部分格式,比如主类名称,和方法名,不能乱写。
foolishcrab
223 天前
你看一下 perfect hq
SmiteChow
223 天前
核心问题:动态代码应该动态编译执行,而不是存为文件。以下是实战中的代码片段,供参考
```
code = compile(python_code, run_file_path, 'exec')
space = globals()
space['__builtins__'].update({
'asql_runtime': self.runtime,
'asql_types': data_types,
'asql_stdlib': stdlib,
})
func = FunctionType(code.co_consts[0], space)
```

其他的问题:
1. 调试,可以提供命令行 sdk
2. 动态编译执行已解决
3. 具体问题具体分析,语法检查目前实现方式没问题
4. 单独开进程去跑才能控制开销,一下是实战中的代码片段,供参考
```
recv_end, send_end = multiprocessing.Pipe(False)

process = multiprocessing.Process(target=self.execute_python_method, args=(send_end, func_name, *args))
process.start()

bag = {'process': process, 'timeout': False}
timer = threading.Timer(self.configure.hook_method_max_execute_time, self.terminate_python_method, args=(bag,))
timer.start()

process.join()

# 超时结束
if bag['timeout']:
raise HookMethodExecuteTimeOut(func_name)

# 在规定时间内结束
timer.cancel()
result, ok = recv_end.recv()
```
xhatt510
223 天前
@SmiteChow 感谢大佬留言
xhatt510
223 天前
@foolishcrab 好的,我去看看
xhatt510
223 天前
@Dongxiaohao 估计和我这个差不多的实现方式
imaple
223 天前
听着好像 xxljob 就能解决, 定时调度自定义 python 代码
xhatt510
223 天前
@imaple 多谢大佬,我去看看
ryulxy
223 天前
我做过的事情和这个有点像,是 C++开发的引擎里嵌入 Python 虚拟机载入用户编写的 Python 脚本执行,可以动态载入 Python 脚本,脚本修改后可以热重载不用重启,然后捕获 Python 脚本编译阶段的报错不运行,和游戏引擎脚本差不多

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1027343

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX