轻量级 Java 应用消息通知中心

2021-07-09 00:30:23 +08:00
 Aidenboss

轻量级 Java 应用消息通知中心

项目地址

https://github.com/yemingfeng/kit-message

项目背景
  1. 应用集群部署,并且使用了 local cache 。当要清除缓存时,通过 rpc / 消息队列清除,只能清除接收到消息的那个节点,无法清除整个应用集群的 local cache 导致,节点 2 、节点 3 存在脏数据。

  1. 应用集群部署,存在耗时的计算,为了减少计算资源浪费,某个节点更新后需要通知集群内其他节点更新

这里的场景本质是,消息如何广播? 那么会有人问为什么不使用消息队列? 因为消息队列无法很优美的实现这里的场景。比如说 kafka,使用不同的 consumer_group 就可以实现,但不优雅。所以开启了 kit-message 这个项目。

技术清单
核心逻辑

快速使用
server 启动

本地启动

  1. 修改 kit-message-center / application.yml 中 redis 的配置,配置遵循 springboot 规范
  2. mvn clean package
  3. java -jar kit-message-server/target/kit-message-center.jar
  4. server 会监听 8800 端口
client 使用

增加依赖


<dependency>
  <groupId>io.github.yemingfeng</groupId>
  <artifactId>kit-message-client</artifactId>
  <version>1.0.0</version>
</dependency>

发布消息

MessageProducer messageProducer = new MessageProducer("127.0.0.1", 8800);
messageProducer.pub("topic1", "topic1:" + i);
messageProducer.close();

订阅消息

MessageConsumer messageConsumer = new MessageConsumer("127.0.0.1", 8800);
messageConsumer.sub("topic1", new BiConsumer<String, String>() {
    @Override
    public void accept(String topic, String payload) {
      System.out.println("topic1_1:" + topic + "\t" + payload);
    }
});
线上环境部署

kit-message-server 支持集群部署,建议使用 nginx 做转发。

stream {
    upstream kit-message-server {
        server server_ip1:8800;
        server server_ip2:8800; 
    }

    server {
       listen 8800;
       proxy_connect_timeout 1s;
       proxy_timeout 3s;
       proxy_pass kit-message-server;
    }
}
Q&A
  1. 这个项目使用场景?使用消息队列等中间件不香吗?

答:这个项目是基于服务间消息通知这个场景的。解决问题更加明确,也更加轻量。

  1. 为什么不直接封装一个 redis-client 进行消息的收发?而是使用 client/server 的模式?

答:kit-message-server 让项目更加通用,接入更加方便,依赖更少,管理维护成本更低。

欢迎提反馈

2459 次点击
所在节点    Java
14 条回复
MidCoder
2021-07-21 14:01:42 +08:00
所有的有点都是自己 YY 的,你有相关一个系统引入一个新的组件带来的各种风险吗?你能确保你的这个组件有多少个 9 的可用性?以及面对真实的生产场景,面对上亿的消息量,你的 redis 机器,和你的 message-center 集群如何实现横向扩展?能否通过简单的加机器就能解决?我看现在你的架构都不具备。你当前的架构就是一个 Toy,练手我觉得可以,但是千万不要用于实际业务场景,否则业务会被害惨的。所以上面说的第一点,纯属自己 YY,当前哪个消息中间件整个稳定性和横向扩展性没有 99%以上?请用成人思维去思考。

架构的本质是去繁化简,多增加一个组件和一行代码都是需要经过仔细思考的,所以你上面说的第二点,无疑是为了自己的技术热情而引入的没必要的架构复杂度

欣赏你对技术的热情,可以作为技术分享对你的技术理解,但不要轻易的将自己的东西当做框架或者开源推出去,这是一种对技术不负责任的态度。
Aidenboss
2021-07-21 20:21:41 +08:00
@MidCoder
1. 引入的风险就看各自的承接了。只要达到 99.9% 可用即可
2. 横向拓展是通过 redis 做的,这里的逻辑推演下即可
3. 可以通过简单的加机器完成

请你先了解好以上再来评价。
MidCoder
2021-07-21 22:23:37 +08:00
@Aidenboss
1 、你这里 99.9%的可用率在亿级别场景肯定达不到,首先你的 center 如何做到容错和负载?这一点你的架构一点都没实现,怎么保障你说的三个九的可用?至少机器挂了你这一点容错能力都没有。
2 、你这里不只是 redis 水平,还有你的 center,你的 center 只是单点?单点你就敢承载亿级别的场景?太异想天开了。如果你 center 要做分布式,那就涉及到 center 的发现和负载,以及 center 和 redis 之间的归属如何分配?不是你想的这么简单。

学习角度我是赞赏的,但是不要那这个去做实际生产场景,赞赏你的分享精神,但是如果你想证明自己,请把这个东西做到你说的三个九。如果这样简单搞一下就保障了三个九,你以为大厂哪些人是吃素的?
Aidenboss
2021-07-21 22:52:02 +08:00
@MidCoder
你再仔细看下为啥 center 不是单点的吧
MidCoder
2021-07-22 09:55:42 +08:00
@Aidenboss 那整个架构 redis 将会是单点,虽然你的 center 可以通过 niginx 负载,但是如何解决单个 topic 消息量倾斜,导致 redis 集群负载不均衡,如果基于 redis,那 topic 的负载如何基于 reids 来实现,是你这个能否规模化的关键,至少这里没有实现或者体现。

如果想真正研究大厂的基础架构,欢迎加 Vbieber-cn,来我厂一起搞事情
Aidenboss
2021-07-22 10:07:24 +08:00
@MidCoder
1. redis 单点就由 redis-cluster 解决。为啥要让应用层解决中间件单点的问题。
2. center 是可以水平扩展的,已经解决了多 topic 的问题了。如果是单 topic 的消息负载,这点确实提醒了我,没有做。但使用 center + redis 参考 redis 模糊匹配的订阅模式即可。

以上已经解决了你的问题。
Aidenboss
2021-07-22 11:31:56 +08:00
Aidenboss
2021-07-22 11:43:55 +08:00
@Aidenboss
重新补充下细节,topic 消息负载使用 topic 内消息分片解决。怎么解决呢?
默认分成 8 个分片数「具体随意调整」
前置条件:
redis-cluster 保存每个 topic 的订阅者信息,以及订阅者的 key 。比如 key = topic_s_list,value = [c1:1,3,6,7,c2:2,4,5,8]
每个消息都会按照 routing 算法计算出 key,每个消息都会发送到对应的 key 订阅通道,如:msg:1,代表发送到 msg1 发送给 topic1,只有 c1 才能接收到 msg1 的消息。


当加入一个新的 topic 订阅者,就先发送 topic:stop 指令,c1 、c2 接收到指令后,之后的消息先缓存在 redis 的待发送 list topic_s_pengding 中
c3 通过 lua 脚本,将 topic_s_list 修改为:[c1:1,3,4,c2:2,5,c3:6,7,8],并发送 sub_update 指令
c1 、c2 接收指令后,重新订阅的 key 变成 topic:1 、topic:3 、topic:4 ; c2 的订阅的 key 是:topic:2 、topic:5 ; c3 订阅的 key 是 topic:6 、topic:7 、topic:8
将 topic_s_pengding 中的 key 重新发送到 topic 中

中间会有类似 kafka rebalance 的现象。但其实并不影响消息的生产。

技术是为了解决问题的,提出没解决的点,照着点去设计就好。
MidCoder
2021-07-22 13:23:12 +08:00
@Aidenboss 是否通过真实的大规模场景验证你的这个方案?如果没有,如何验证你的方案是真的可行?
第一:首先采用 redis 方案,看似把最难解决的消息存储交给 redis 已有解决方案来去解决。但是在真实大规模场景下,这会导致网络开销增加了一倍,因为多了一次 center 和 redis 的 request/response 。这种网络开销在亿级别的消息体量下,会严重影响性能
第二:整个集群管理你如何保障?如何让全局感知整个 topic 分片的负载策略?以及当出现网络异常(你的 stop 命令都无法发出的时候)如何保障集群的一致性?以及消息的消费顺序如何保障,如何记录消息消费到了哪里?以及当消费端重启的时候,如何找到之前的消费位置?

只能说你在逐步完善一个消息消费的最基本能力(消息通信),但是对于一个简单的 MQ 场景来说,这只是最简单的部分
Aidenboss
2021-07-22 16:47:58 +08:00
@MidCoder
你没理解这个场景,既然使用了 redis,就不需要保障消息可靠和一致性。
这个场景只需要解决消息传递即可,这也是这个项目的本意。
如果要做更牛的功能,直接用 RabittMQ 解决好了。
不需要什么事情都推到大规模、亿万消息、机制性能。技术是为了解决问题而存在的。只要解决那个场景的问题,自然就有存在的价值。如果脱离的场景,只考虑技术难点,会忽略了解决这个问题的本质。
MidCoder
2021-07-22 17:42:24 +08:00
@Aidenboss 那你是如何解决消息消费者消费到一半,宕机或者重新部署,消息不丢失的?如果你不记录消息消费位置的?
Aidenboss
2021-07-22 19:04:47 +08:00
@MidCoder 如果你需要解决,可以用: https://www.xuxueli.com/xxl-mq/
MidCoder
2021-07-22 21:35:18 +08:00
@Aidenboss 看来研究的开源项目不少,来不来我们这边搞事情?我们这边是搞基础架构的
Jwyt
2021-07-23 08:55:27 +08:00
好家伙,楼上是来面试的?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/788412

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX