V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
huang9
V2EX  ›  程序员

vector 的 vrl 语言,如何消费 kafka 的 [{"a":1},{"b":2},{"c":12},{"z":83}]这种数据?

  •  
  •   huang9 · 4 小时 11 分钟前 · 303 次点击

    vector 的 vrl 语言,如何消费 kafka 的 [{"a":1},{"b":2},{"c":12},{"z":83}]这种数据?

    vector 太强大了,比 logstash 强很多倍,发现好多 vrl 语法用不会,看看有大神在用这个的吗,给指点一下,以下是我在 stackoverflow 发的一个帖子

    https://stackoverflow.com/questions/79013873/how-to-use-vectors-vrl-language-to-consume-data-like-a1-b2-c12

    拜谢

    7 条回复    2024-09-23 20:19:01 +08:00
    mark2025
        1
    mark2025  
       4 小时 5 分钟前
    这不就是 json 格式么?
    sys64
        2
    sys64  
       4 小时 4 分钟前
    在使用 [Vector]( https://vector.dev/)( vrl 语言)时,消费 Kafka 数据需要遵循几个步骤。假设你已经通过 Vector 连接 Kafka ,并将 Kafka 数据流转换为 Vector 能处理的日志结构。接下来,你可以使用 VRL (Vector Remap Language) 来处理像 `[{ "a": 1 }, { "b": 2 }, { "c": 12 }, { "z": 83 }]` 这样的 JSON 数据。

    以下是消费 Kafka 数据的流程:

    ### 1. Vector 配置 Kafka Source
    首先,确保你的 `vector.toml` 文件中已经配置好了 Kafka Source 。简单示例如下:

    ```toml
    [sources.kafka]
    type = "kafka"
    bootstrap_servers = "localhost:9092"
    group_id = "consumer-group-id"
    topics = ["my-topic"]
    key_field = "key" # 如果 Kafka 数据有 key
    value_field = "message" # 消息数据字段
    encoding.codec = "json" # 假设 Kafka 数据是 JSON 格式
    ```

    ### 2. 使用 VRL 处理 Kafka 消息
    假设 Kafka 消息的 `message` 字段包含了你的数据,例如 `[{ "a": 1 }, { "b": 2 }, { "c": 12 }, { "z": 83 }]`,你可以使用 VRL 脚本来提取和处理这些字段。

    #### VRL 脚本示例:
    ```toml
    [transforms.kafka_parser]
    type = "remap"
    inputs = ["kafka"]
    source = '''
    # 假设 message 字段包含 JSON 数据数组
    .data = parse_json!(.message)
    '''
    ```

    ### 3. 消费数据
    当 `message` 是 `[{ "a": 1 }, { "b": 2 }, { "c": 12 }, { "z": 83 }]` 这样的数组时,VRL 处理步骤可以提取每个字段。

    你可以直接在 `source` 部分进行各种操作,比如遍历 JSON 数组或处理特定的值:

    ```toml
    source = '''
    .data = parse_json!(.message)

    # 遍历数组,提取每个元素
    .fields = []
    for item in .data do
    .fields = .fields + [get(item, "a", "undefined")]
    end
    '''
    ```

    ### 4. 输出处理后的数据
    处理完 Kafka 数据后,你可以将结果传递给某个 Sink ,例如 Console 、Elasticsearch 等。

    ```toml
    [sinks.console]
    type = "console"
    inputs = ["kafka_parser"]
    encoding.codec = "json"
    ```

    ### 示例流程总结
    1. Kafka source 接收 `[{ "a": 1 }, { "b": 2 }, { "c": 12 }, { "z": 83 }]`。
    2. 使用 VRL 的 `parse_json!` 函数解析 JSON 数据。
    3. 遍历数组、提取每个 JSON 对象中的值。
    4. 输出处理后的数据到 sink ,例如 Console 。

    你可以根据业务需求修改 VRL 逻辑,处理和过滤特定字段、对值进行转换等。
    fruitmonster
        3
    fruitmonster  
       4 小时 1 分钟前
    @sys64 你使用对话模型,回帖会被封的
    me262
        4
    me262  
       2 小时 44 分钟前
    superchijinpeng
        5
    superchijinpeng  
       2 小时 35 分钟前
    vector 中,你这种数据可以直接在 sink 里,一条一条消费的
    huang9
        6
    huang9  
    OP
       57 分钟前
    @superchijinpeng sink 还能处理吗? 请问大佬怎么处理呢
    huang9
        7
    huang9  
    OP
       56 分钟前
    @sys64 这个数据本身都不是 json
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2933 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 13:15 · PVG 21:15 · LAX 06:15 · JFK 09:15
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.