同一程序内的消息队列,这样用单例的简易实现比 redis 的 Pub-Sub 效率高吗?如果不实现 Unsubscribe,有内存泄漏风险吗?

35 天前
 drymonfidelia
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

public class BasicMessageBus
{
    private static readonly BasicMessageBus _instance = new();
    public static BasicMessageBus Inst => _instance;

    private readonly Subject<object> _messages = new();
    public IObservable<T> Subscribe<T>() => _messages.OfType<T>();
    public void Send(object message) => _messages.OnNext(message);
}
1078 次点击
所在节点    .NET
3 条回复
hez2010
31 天前
没有看到哪里有内存泄露的风险。
调用 Subscribe 的方法所在的对象如果被回收了那对 _messages 的引用也就自动没了,除非你是在哪个具有 static 生命周期的对象中调用了 Subscribe 。
coder001
28 天前
进程内队列? 为啥不用 Channel

https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.channels.channel?view=net-8.0


另外,如果是事件总线,可以考虑引入泛型之类的花样类型匹配订阅筛选器

这是自用的事件总线实现,目前大规模用在工作生产环境和玩具项目,未发现明显性能瓶颈


IEventBus.cs

```
public interface IEventBus
{
bool Subscript<T>(Action<T> callBack);

bool UnSubscript<T>(Action<T> callBack);

bool Publish<T>();

bool Publish<T>(T obj);
}
```

AnyPublishEvent.cs

```
/// <summary>
/// 任何事件发布,用于统计或通配
/// </summary>
[DisplayName("*")]
public record AnyPublishEvent(Type Type, object? Obj);
```

InProcessEventBusBase.cs

```
public abstract class InProcessEventBusBase(ILogger<InProcessEventBusBase> logger) : IEventBus
{
private readonly Dictionary<Type, HashSet<Delegate>> _dicTypeToHandlers = [];

public bool Subscript<T>(Action<T> callBack)
{
var type = typeof(T);
lock (_dicTypeToHandlers)
{
if (!_dicTypeToHandlers.TryGetValue(type, out var handlers))
{
handlers = _dicTypeToHandlers[type] = [];
}

return handlers.Add(callBack); // 忽略重复
}
}

public bool UnSubscript<T>(Action<T> callBack)
{
lock (_dicTypeToHandlers)
{
if (_dicTypeToHandlers.TryGetValue(typeof(T), out var handlers))
{
var unSubscript = handlers.Remove(callBack);

if (handlers.Count == 0) _dicTypeToHandlers.Remove(typeof(T));

return unSubscript;
}

return false;
}
}

public bool Publish<T>()
{
PublishInternal(new AnyPublishEvent(typeof(T), default));
return PublishInternal<T?>(default);
}

public bool Publish<T>(T obj)
{
PublishInternal(new AnyPublishEvent(typeof(T), obj));
return PublishInternal(obj);
}

private bool PublishInternal<T>(T eventValue)
{
var type = typeof(T);

Delegate[] subscripts;
lock (_dicTypeToHandlers)
{
if (!_dicTypeToHandlers.TryGetValue(type, out var handlers)) return false;
subscripts = [.. handlers];
}

foreach (var del in subscripts)
{
try
{
((Action<T>)del)(eventValue);
}
catch (Exception e)
{
logger.LogError(e, nameof(Publish));
}
}

return true;
}
}
```
coder001
28 天前
(看来似乎回帖没有代码格式支持,而且 gist 连接展开的特性似乎也没了,凑合看吧🌚)

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1091813

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX