请教, kafka 如何做到一个 topic 分发不同的类型的消息

2023-04-25 11:46:42 +08:00
 NoKey
场景是这样的,上游服务 A ,通过 kafka 发消息个下游服务 B,C,D

为了后续集成方便,A 使用了一个 Topic

这个时候,需要 BCD 接收自己的消息

这种场景下,如何才能控制 BCD 只收到自己的消息,不收别人的消息呢?

考虑了几种方式:
1. 通过 key 。这样下游服务只有收到消息之后才知道 key 是啥,不是自己的丢弃,但是这样必须收消息,也就是 B 会收到 C ,D 的消息,感觉不好。
2. 通过分区。不同下游的消息放到不同的分区,但是这样会造成分区不均衡,部分分区过大。

请问一下大家有没有更好的办法呢?谢谢
2397 次点击
所在节点    程序员
25 条回复
zhaoyy0513
2023-04-25 16:23:45 +08:00
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class KafkaStreamsExample {
public static void main(String[] args) {
// 设置 Kafka Streams 属性
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// 创建 Kafka Streams 实例
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("topic-A");

// 根据消息的 key 将消息路由到不同的分区中
stream.selectKey((key, value) -> key)
.through("topic-A-shuffle")
.groupByKey()
.foreach((key, value) -> {
// 处理消息
System.out.println("Processed message: " + value);
});

// 将处理后的消息发送到下游服务
stream.mapValues(value -> "processed " + value)
.to("topic-B", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();
}
}

在上面的代码中,首先使用 selectKey()方法将消息的 key 作为新的 key ,然后使用 through()方法将消息发送到一个新的 Topic 中,这个新的 Topic 会使用 Kafka 默认的分区策略将消息路由到不同的分区中。然后,我们使用 groupByKey()方法将同一个 key 的消息分组,确保每个消费者只消费自己需要的消息。最后,我们使用 foreach()方法处理分组后的消息,并使用 mapValues()方法将处理后的消息发送到下游服务。

需要注意的是,使用分流操作可能会导致数据倾斜(data skew)问题,因为某些 key 的消息可能比其他 key 的消息更频繁,从而导致某些分区比其他分区拥有更多的消息。为了解决这个问题,可以使用一些分区策略(partitioning strategy),例如随机分配、循环分配、哈希分配等。
burymme11
2023-04-25 16:34:11 +08:00
可以中间自己加一个路由层。
新建一个中间层 AA ,来监听 topic ,处理上游服务 A 的消息,在 AA 里面,自己写代码做负载均衡,比如根据消息 ID 取模,给 B ,C ,D 分配好不同的 key ,最后所有消息再往新的 NewTopic 里丢。这样 B ,C ,D 就监听 NewTopic 就行,以后要加薪的下游服务,你只要改动 AA 层分发路由的代码就好。
Dlin
2023-04-25 16:37:01 +08:00
kafka 的 topic 和 rabbitmq 的 topic 不一样么。
zhaoyy0513
2023-04-25 16:37:30 +08:00
要实现上游系统 A 将消息发送到下游系统 B 、C 、D ,并确保每个下游系统只处理自己需要处理的消息,同时还要确保消息只被消费一次,可以采用以下方案:

使用 Kafka 作为消息中间件,将上游系统 A 发送的消息发布到一个名为"topic-A"的 Kafka 主题中。

在下游系统 B 、C 、D 中,创建三个不同的消费者组,分别为"group-B"、"group-C"、"group-D",并订阅"topic-A"主题。

在消费者端,使用 Kafka 中的消息过滤器来过滤掉不需要的消息,只选择要处理的消息。可以使用 Kafka 中的消息键(key)来实现过滤。例如,下游系统 B 只想处理键(key)为"key-B"的消息,可以使用以下代码来实现:

java
Copy
// 创建 Kafka 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group-B");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅"topic-A"主题
consumer.subscribe(Collections.singletonList("topic-A"));

// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.key().equals("key-B")) {
// 处理消息
}
}
consumer.commitSync();
}
```

为了确保消息只被消费一次,将消费者的 auto.offset.reset 属性设置为"earliest",并启用自动提交偏移量。这将确保消费者在启动时从最早可用的偏移量开始消费,以避免漏掉任何消息,并且将自动提交偏移量以确保每个消息只被消费一次。例如,可以使用以下代码来实现:

java
Copy
// 创建 Kafka 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group-B");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
```
使用上述方案,上游系统 A 可以将消息发送到"topic-A"主题中,下游系统 B 、C 、D 可以使用 Kafka 消费者订阅该主题,并使用消息过滤器来过滤掉不需要的消息,只选择要处理的消息。自动提交偏移量将确保每个消息只被消费一次。





上面两条回复都是 chatgpt 回复的
PythonYXY
2023-04-25 16:46:13 +08:00
为什么不建多个 topic 呢,如果下游服务不固定可以做成配置式的啊

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/935312

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX