当下遇到的问题:
服务提供商:
1. 集群每个节点的吞吐量在 1.5 MB/s 左右,远小于服务的吞吐量
2. 3 个节点每个 topic 设置 90 个分区, 3 副本,这个使用方式不太合理,服务需要对每个 topic 维护 90x3 个 replica 进程,io process 也要维护 90x3 个,原来顺序的读写也会退化为随机读写,网络 process 需要维护 90 个
3. 看历史监控记录,副本延迟在过去是会频繁发生的
4. 之前有建议您修改分区到 6 ~ 9 个 您这边反馈分区数调低之后消费者有延迟,实际您这边的吞吐量远没有达到服务应该有的吞吐量,怀疑是客户端方面有问题,需要您在消费端打印每次 poll 的时间和 poll 下来的消息条数,确定消费者行为,这样我们可以进一步分析
现在我们这边的解决方案还是和之前的建议一样,topic 分区数调整到 6 ~ 9 个,消费延迟的问题需要从客户端出发解决
开发者:
调整为 6 个分区之后,不是消费延迟问题,是单个消费者的能力不足,跟不上生产的速度。之前已经试过了,10 来分钟就堆积了 100 万消息。
服务提供商:
6 个分区的话,可以使用 6 个消费者,6 个消费者的能力远不止这么差.
max.poll.records,可以用于指定批量消费条数的
配合配置 max.partition.fetch.byte 和 fetch.max.wait.ms 两个参数 可以实现批量消费 kafka 的消息。您看看 php 的客户端是否有设置这些参数的地方,或者有其他地方可以设置消费者的批量消费的,因为一条条的消费,效率是极低的
开发者:
rdkafka 扩展里,好像没这个相关的参数
当前是 1 个 topic,90 个分区,分区数太多引起 kafka 集群副本同步时的性能下降问题。服务商建议减少分区数,但是减少分区数会有大量的消息堆积,rdkafka 如何提升单消费者的性能呢?
消费者大致代码如下:
$this->RdKafkaConf = new RdKafka\Conf();
$this->RdKafkaConf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
$kafka->assign(null);
break;
default:
throw new \Exception($err);
}
});
$this->RdKafkaConf->set('group.id', $groupid);
// Initial list of Kafka brokers
$this->RdKafkaConf->set('metadata.broker.list', $configs);
$this->RdKafkaConf->set('socket.keepalive.enable', 'true');
$this->RdKafkaConf->set('enable.auto.commit', 'true');
$this->RdKafkaConf->set('auto.commit.interval.ms', '100');
$this->RdKafkaConf->set('auto.offset.reset', 'smallest');
$topic = is_array($topic) ? $topic : [$topic];
$consumer = new RdKafka\KafkaConsumer($this->RdKafkaConf);
$consumer->subscribe($topic);
while (true) {
$message = $consumer->consume($timeout * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
call_user_func_array($callback, [$message]);
// $consumer->commitAsync($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// Log::get('consumer')->info("No more messages; will wait for more");
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// Log::get('consumer')->error("Timed out");
break;
default:
throw new \Exception($message->errstr(), $message->err);
}
}
//callback function
if (count(self::$queue) >= 10 || (time() - $this->lastWriteTimestamp) >= 1) {
self::$queue[] = $msg;
$queue = self::$queue;
self::$queue = [];
$this->lastWriteTimestamp = time();
$reportData = [];
foreach ($queue as $message) {
$data = json_decode($message->payload, true);
// 入库
}
} else {
self::$queue[] = $msg;
}
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.