github: https://github.com/chenquan/arkflow
高性能 Rust 流处理引擎,提供强大的数据流处理能力,支持多种输入输出源和处理器。
# 克隆仓库
git clone https://github.com/chenquan/arkflow.git
cd arkflow
# 构建项目
cargo build --release
# 运行测试
cargo test
config.yaml
:logging:
level: info
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1s
batch_size: 10
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT * FROM flow WHERE value >= 10"
- type: "arrow_to_json"
output:
type: "stdout"
./target/release/arkflow --config config.yaml
ArkFlow 使用 YAML 格式的配置文件,支持以下主要配置项:
logging:
level: info # 日志级别:debug, info, warn, error
streams: # 流定义列表
- input: # 输入配置
# ...
pipeline: # 处理管道配置
# ...
output: # 输出配置
# ...
ArkFlow 支持多种输入源:
示例:
input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
client_id: arkflow
start_from_latest: true
ArkFlow 提供多种数据处理器:
示例:
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value >= 10"
- type: arrow_to_json
ArkFlow 支持多种输出目标:
示例:
output:
type: kafka
brokers:
- localhost:9092
topic: output-topic
client_id: arkflow-producer
streams:
- input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value > 100"
- type: arrow_to_json
output:
type: kafka
brokers:
- localhost:9092
topic: processed-topic
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1ms
batch_size: 10000
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
- type: "arrow_to_json"
output:
type: "stdout"
1
chenquan OP |
![]() |
2
des 21 小时 15 分钟前
基于 yaml 配置文件配置的?感觉迟早是个麻烦的事,除非你一次配置好就不改了
|
4
3085570450tt 13 小时 33 分钟前
已 star. 同 2 楼一样,yaml 配置可能会劝退一部分人;
对比其他类似的流引擎,比如 risingwave/arroyo, 发现似乎轻量是它的优点?目前暂时没有 benchmark 可用参考的,同时其他两个工具支持的数据源格式更多; 计划有没有其他客户端 sdk, 比如 node/python 等等,与引擎本身进行交互等 如果暂时支持不了更多数据源,是否将数据源 input/output 这一块,抽离成扩展的方式,让用户自定义更好呢? |
5
chenquan OP @3085570450tt
1. 目前其实 yaml 的配置也很简单的,但是也有一定的门槛(适合用于一定开发经验的人),这个在后期会进一步优化哈。 2. input 、output 、processor 已经支持扩展( https://github.com/chenquan/arkflow/pull/75 ),会尽快完善这块的内容。 |
6
chenquan OP @3085570450tt 欢迎继续关注 arkflow 、有什么想法可以畅所欲言。
|