ArkFlow – 高性能 Rust 流处理引擎

4 天前
chenquan  chenquan

github: https://github.com/chenquan/arkflow

高性能 Rust 流处理引擎,提供强大的数据流处理能力,支持多种输入输出源和处理器。

特性

安装

从源码构建

# 克隆仓库
git clone https://github.com/chenquan/arkflow.git
cd arkflow

# 构建项目
cargo build --release

# 运行测试
cargo test

快速开始

  1. 创建配置文件 config.yaml
logging:
  level: info
streams:
  - input:
      type: "generate"
      context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
      interval: 1s
      batch_size: 10

    pipeline:
      thread_num: 4
      processors:
        - type: "json_to_arrow"
        - type: "sql"
          query: "SELECT * FROM flow WHERE value >= 10"
        - type: "arrow_to_json"

    output:
      type: "stdout"
  1. 运行 ArkFlow:
./target/release/arkflow --config config.yaml

配置说明

ArkFlow 使用 YAML 格式的配置文件,支持以下主要配置项:

顶级配置

logging:
  level: info  # 日志级别:debug, info, warn, error

streams:       # 流定义列表
  - input:      # 输入配置
      # ...
    pipeline:   # 处理管道配置
      # ...
    output:     # 输出配置
      # ...

输入组件

ArkFlow 支持多种输入源:

示例:

input:
  type: kafka
  brokers:
    - localhost:9092
  topics:
    - test-topic
  consumer_group: test-group
  client_id: arkflow
  start_from_latest: true

处理器

ArkFlow 提供多种数据处理器:

示例:

pipeline:
  thread_num: 4
  processors:
    - type: json_to_arrow
    - type: sql
      query: "SELECT * FROM flow WHERE value >= 10"
    - type: arrow_to_json

输出组件

ArkFlow 支持多种输出目标:

示例:

output:
  type: kafka
  brokers:
    - localhost:9092
  topic: output-topic
  client_id: arkflow-producer

示例

Kafka 到 Kafka 的数据处理

streams:
  - input:
      type: kafka
      brokers:
        - localhost:9092
      topics:
        - test-topic
      consumer_group: test-group

    pipeline:
      thread_num: 4
      processors:
        - type: json_to_arrow
        - type: sql
          query: "SELECT * FROM flow WHERE value > 100"
        - type: arrow_to_json

    output:
      type: kafka
      brokers:
        - localhost:9092
      topic: processed-topic

生成测试数据并处理

streams:
  - input:
      type: "generate"
      context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
      interval: 1ms
      batch_size: 10000

    pipeline:
      thread_num: 4
      processors:
        - type: "json_to_arrow"
        - type: "sql"
          query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
        - type: "arrow_to_json"

    output:
      type: "stdout"
965 次点击
所在节点    开源软件
11 条回复
chenquan
4 天前
des
4 天前
基于 yaml 配置文件配置的?感觉迟早是个麻烦的事,除非你一次配置好就不改了
chenquan
4 天前
@des 是的,目前的设计中是采用 yaml 配置哈,不过今后会考虑使用动态更新 yaml 配置的方式来提升体验。如果有更好的方式欢迎随时交流哈
3085570450tt
4 天前
已 star. 同 2 楼一样,yaml 配置可能会劝退一部分人;
对比其他类似的流引擎,比如 risingwave/arroyo, 发现似乎轻量是它的优点?目前暂时没有 benchmark 可用参考的,同时其他两个工具支持的数据源格式更多;
计划有没有其他客户端 sdk, 比如 node/python 等等,与引擎本身进行交互等
如果暂时支持不了更多数据源,是否将数据源 input/output 这一块,抽离成扩展的方式,让用户自定义更好呢?
chenquan
4 天前
@3085570450tt
1. 目前其实 yaml 的配置也很简单的,但是也有一定的门槛(适合用于一定开发经验的人),这个在后期会进一步优化哈。
2. input 、output 、processor 已经支持扩展( https://github.com/chenquan/arkflow/pull/75 ),会尽快完善这块的内容。
chenquan
4 天前
@3085570450tt 欢迎继续关注 arkflow 、有什么想法可以畅所欲言。
des
2 天前
@chenquan 使用 yaml 两个不好的点,一个是不方便调试,另外一个是不方便修改
des
2 天前
@3085570450tt 轻量不是好听点的说法嘛 哈哈哈
chenquan
1 天前
@des 有什么好的解法吗?
des
1 天前
@chenquan 我没有想法,要不就不会光提问题了,哈哈哈🤣
des
1 天前
@chenquan 提供一个可能的做法供参考
就是提供一个接口和页面可以用来在线编辑,以及从数据里面抽取一些数据用来调试
想想这个感觉和 arroyo 有点像哎

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1118752

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX