V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
chenquan
V2EX  ›  开源软件

ArkFlow – 高性能 Rust 流处理引擎

  •  
  •   chenquan · 22 小时 56 分钟前 · 800 次点击

    github: https://github.com/chenquan/arkflow

    高性能 Rust 流处理引擎,提供强大的数据流处理能力,支持多种输入输出源和处理器。

    特性

    • 高性能:基于 Rust 和 Tokio 异步运行时构建,提供卓越的性能和低延迟
    • 多种数据源:支持 Kafka 、MQTT 、HTTP 、文件等多种输入输出源
    • 强大的处理能力:内置 SQL 查询、JSON 处理、Protobuf 编解码、批处理等多种处理器
    • 可扩展:模块化设计,易于扩展新的输入、输出和处理器组件

    安装

    从源码构建

    # 克隆仓库
    git clone https://github.com/chenquan/arkflow.git
    cd arkflow
    
    # 构建项目
    cargo build --release
    
    # 运行测试
    cargo test
    

    快速开始

    1. 创建配置文件 config.yaml
    logging:
      level: info
    streams:
      - input:
          type: "generate"
          context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
          interval: 1s
          batch_size: 10
    
        pipeline:
          thread_num: 4
          processors:
            - type: "json_to_arrow"
            - type: "sql"
              query: "SELECT * FROM flow WHERE value >= 10"
            - type: "arrow_to_json"
    
        output:
          type: "stdout"
    
    1. 运行 ArkFlow:
    ./target/release/arkflow --config config.yaml
    

    配置说明

    ArkFlow 使用 YAML 格式的配置文件,支持以下主要配置项:

    顶级配置

    logging:
      level: info  # 日志级别:debug, info, warn, error
    
    streams:       # 流定义列表
      - input:      # 输入配置
          # ...
        pipeline:   # 处理管道配置
          # ...
        output:     # 输出配置
          # ...
    

    输入组件

    ArkFlow 支持多种输入源:

    • Kafka:从 Kafka 主题读取数据
    • MQTT:从 MQTT 主题订阅消息
    • HTTP:通过 HTTP 接收数据
    • 文件:从文件读取数据
    • 生成器:生成测试数据
    • SQL:从数据库查询数据

    示例:

    input:
      type: kafka
      brokers:
        - localhost:9092
      topics:
        - test-topic
      consumer_group: test-group
      client_id: arkflow
      start_from_latest: true
    

    处理器

    ArkFlow 提供多种数据处理器:

    • JSON:JSON 数据处理和转换
    • SQL:使用 SQL 查询处理数据
    • Protobuf:Protobuf 编解码
    • 批处理:将消息批量处理

    示例:

    pipeline:
      thread_num: 4
      processors:
        - type: json_to_arrow
        - type: sql
          query: "SELECT * FROM flow WHERE value >= 10"
        - type: arrow_to_json
    

    输出组件

    ArkFlow 支持多种输出目标:

    • Kafka:将数据写入 Kafka 主题
    • MQTT:将消息发布到 MQTT 主题
    • HTTP:通过 HTTP 发送数据
    • 文件:将数据写入文件
    • 标准输出:将数据输出到控制台

    示例:

    output:
      type: kafka
      brokers:
        - localhost:9092
      topic: output-topic
      client_id: arkflow-producer
    

    示例

    Kafka 到 Kafka 的数据处理

    streams:
      - input:
          type: kafka
          brokers:
            - localhost:9092
          topics:
            - test-topic
          consumer_group: test-group
    
        pipeline:
          thread_num: 4
          processors:
            - type: json_to_arrow
            - type: sql
              query: "SELECT * FROM flow WHERE value > 100"
            - type: arrow_to_json
    
        output:
          type: kafka
          brokers:
            - localhost:9092
          topic: processed-topic
    

    生成测试数据并处理

    streams:
      - input:
          type: "generate"
          context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
          interval: 1ms
          batch_size: 10000
    
        pipeline:
          thread_num: 4
          processors:
            - type: "json_to_arrow"
            - type: "sql"
              query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
            - type: "arrow_to_json"
    
        output:
          type: "stdout"
    
    第 1 条附言  ·  21 小时 52 分钟前
    欢迎大家讨论,一起完善它哈,
    感兴趣的朋友可以帮忙点个星星哦!
    6 条回复    2025-03-16 18:24:31 +08:00
    chenquan
        1
    chenquan  
    OP
       22 小时 55 分钟前
    des
        2
    des  
       21 小时 15 分钟前
    基于 yaml 配置文件配置的?感觉迟早是个麻烦的事,除非你一次配置好就不改了
    chenquan
        3
    chenquan  
    OP
       21 小时 8 分钟前
    @des 是的,目前的设计中是采用 yaml 配置哈,不过今后会考虑使用动态更新 yaml 配置的方式来提升体验。如果有更好的方式欢迎随时交流哈
    3085570450tt
        4
    3085570450tt  
       13 小时 33 分钟前
    已 star. 同 2 楼一样,yaml 配置可能会劝退一部分人;
    对比其他类似的流引擎,比如 risingwave/arroyo, 发现似乎轻量是它的优点?目前暂时没有 benchmark 可用参考的,同时其他两个工具支持的数据源格式更多;
    计划有没有其他客户端 sdk, 比如 node/python 等等,与引擎本身进行交互等
    如果暂时支持不了更多数据源,是否将数据源 input/output 这一块,抽离成扩展的方式,让用户自定义更好呢?
    chenquan
        5
    chenquan  
    OP
       13 小时 17 分钟前
    @3085570450tt
    1. 目前其实 yaml 的配置也很简单的,但是也有一定的门槛(适合用于一定开发经验的人),这个在后期会进一步优化哈。
    2. input 、output 、processor 已经支持扩展( https://github.com/chenquan/arkflow/pull/75 ),会尽快完善这块的内容。
    chenquan
        6
    chenquan  
    OP
       13 小时 16 分钟前
    @3085570450tt 欢迎继续关注 arkflow 、有什么想法可以畅所欲言。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1138 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 23:40 · PVG 07:40 · LAX 16:40 · JFK 19:40
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.