求问如何用 Python 读取 Kafka topic 最新一条数据的 timestamp

2020-09-24 10:46:30 +08:00
 levelworm

我想要读 Kafka topic 最新一条数据的时间戳,然后用这个时间戳在 vertica 里头做为 partition 的依据。不过现在的问题是,使用seek_to_end()之后,就陷入到死循环了,一条数据也没放出来,我觉得应该能放出最后一条消息的啊?请问我哪里弄错了?

这是我的代码:


topic: str = 'mytopic'
broker: str = 'myserver'

consumer = KafkaConsumer(
    bootstrap_servers=[broker],
    enable_auto_commit=True
)

tp = TopicPartition(topic, 0)
consumer.assign([tp])
consumer.poll()
consumer.seek_to_end()

for message in consumer:
    print(message.timestamp)
    print(message)

consumer.close()

1722 次点击
所在节点    Python
2 条回复
levelworm
2020-09-24 10:55:05 +08:00
我又试了下,可以看到 offset,诡异啊。。。照理说能看到 offset 不就应该能够看到消息吗?还是我理解错了。
levelworm
2020-09-24 11:12:31 +08:00
我搞定了我,原来 seek_to_end()之后他给我的是最后一个 offset+1,那我的 Kafka 此时并没有接受新的消息,那肯定是不会显示任何东西了,我只需要把 offset-1 然后再 seek 即可。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/710027

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX