关于 PHP Rdkafka 消费者性能讨论

2021-05-11 11:19:04 +08:00
 yuandj

当下遇到的问题:

服务提供商:
1. 集群每个节点的吞吐量在 1.5 MB/s 左右,远小于服务的吞吐量
2. 3 个节点每个 topic 设置 90 个分区, 3 副本,这个使用方式不太合理,服务需要对每个 topic 维护 90x3 个 replica 进程,io process 也要维护 90x3 个,原来顺序的读写也会退化为随机读写,网络 process 需要维护 90 个
3. 看历史监控记录,副本延迟在过去是会频繁发生的
4. 之前有建议您修改分区到 6 ~ 9 个 您这边反馈分区数调低之后消费者有延迟,实际您这边的吞吐量远没有达到服务应该有的吞吐量,怀疑是客户端方面有问题,需要您在消费端打印每次 poll 的时间和 poll 下来的消息条数,确定消费者行为,这样我们可以进一步分析

现在我们这边的解决方案还是和之前的建议一样,topic 分区数调整到 6 ~ 9 个,消费延迟的问题需要从客户端出发解决

开发者:
调整为 6 个分区之后,不是消费延迟问题,是单个消费者的能力不足,跟不上生产的速度。之前已经试过了,10 来分钟就堆积了 100 万消息。

服务提供商:
6 个分区的话,可以使用 6 个消费者,6 个消费者的能力远不止这么差.
max.poll.records,可以用于指定批量消费条数的
配合配置 max.partition.fetch.byte 和 fetch.max.wait.ms 两个参数 可以实现批量消费 kafka 的消息。您看看 php 的客户端是否有设置这些参数的地方,或者有其他地方可以设置消费者的批量消费的,因为一条条的消费,效率是极低的

开发者:
rdkafka 扩展里,好像没这个相关的参数

当前是 1 个 topic,90 个分区,分区数太多引起 kafka 集群副本同步时的性能下降问题。服务商建议减少分区数,但是减少分区数会有大量的消息堆积,rdkafka 如何提升单消费者的性能呢?

消费者大致代码如下:

$this->RdKafkaConf = new RdKafka\Conf();

$this->RdKafkaConf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    $kafka->assign($partitions);
                    break;
                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    $kafka->assign(null);
                    break;
                default:
                    throw new \Exception($err);
            }
        });

        $this->RdKafkaConf->set('group.id', $groupid);
        // Initial list of Kafka brokers
        $this->RdKafkaConf->set('metadata.broker.list', $configs);
        $this->RdKafkaConf->set('socket.keepalive.enable', 'true');
        $this->RdKafkaConf->set('enable.auto.commit', 'true');
        $this->RdKafkaConf->set('auto.commit.interval.ms', '100');
        $this->RdKafkaConf->set('auto.offset.reset', 'smallest');

        $topic = is_array($topic) ? $topic : [$topic];
        $consumer = new RdKafka\KafkaConsumer($this->RdKafkaConf);
        $consumer->subscribe($topic);
        while (true) {
            $message = $consumer->consume($timeout * 1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    call_user_func_array($callback, [$message]);
//                    $consumer->commitAsync($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
//                    Log::get('consumer')->info("No more messages; will wait for more");
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
//                    Log::get('consumer')->error("Timed out");
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
            }
        }


//callback function
if (count(self::$queue) >= 10 || (time() - $this->lastWriteTimestamp) >= 1) {
                self::$queue[] = $msg;
                $queue = self::$queue;
                self::$queue = [];
                $this->lastWriteTimestamp = time();
                $reportData = [];
                
                foreach ($queue as $message) {
                    $data = json_decode($message->payload, true);
                    // 入库
                }
} else {
                self::$queue[] = $msg;
}
2038 次点击
所在节点    Kafka
6 条回复
iyaozhen
2021-05-11 13:07:22 +08:00
callback 慢呗,可以多进程( 1-2 倍分区数)同一个 group.id 并行消费。
yuandj
2021-05-11 13:17:05 +08:00
@iyaozhen 用 swoole 的协程试过,多个协程之间会重复消费数据
iyaozhen
2021-05-11 13:40:02 +08:00
@yuandj 不用携程,多进程最合适。确定是相同 group id ?
yuandj
2021-05-11 13:50:50 +08:00
@iyaozhen 一个 topic 下的一个分区,在同一时间,不是只能被一个消费者消费吗?
JKeita
2021-05-11 14:16:14 +08:00
一个 topic 每个分区只会被消费者组里的一个消费者消费。
iyaozhen
2021-05-11 14:51:59 +08:00
@yuandj 是啊,但你可以 n 个主进程消费,然后扔给 task 进程入库

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/776193

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX