[natsio] 使用入门问题

302 天前
 RedBeanIce
docker pull bitnami/nats:2.10.11
docker run -ti -p 4222:4222 -p 8222:8222 --name nats bitnami/nats:2.10.11

依赖

go get github.com/nats-io/nats.go/@v1.33.1

推送消息

func Test_natsio_request(t *testing.T) {
	url := nats.DefaultURL

	nc, _ := nats.Connect(url)
	defer nc.Drain()

	// Matches all of the above
	nc.Publish("foo", []byte("Hello World")) // Use the response

	println("====================  request request request")
}

接收消息


func Test_natsio_resp(t *testing.T) {
	url := nats.DefaultURL

	nc, _ := nats.Connect(url)
	defer nc.Drain()

	nc.Subscribe("foo", func(m *nats.Msg) {
		fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data))
	})
	println("====================  resp ")
}

相关代码配置如上所示,最终我失败了,没有打印 Msg received on 这个打印。

我查看了官方的相关文档,b 站的使用视频,我进行了一些尝试,但是仍然没有成功。

·············································

请问我如何才能正确的接收消息呢。。。

1407 次点击
所在节点    Go 编程语言
8 条回复
RedBeanIce
302 天前
@XCFOX 求大佬解答一个新手问题额,我不知道我哪里错误了。

如有冒犯,非常抱歉。。
kumoocat
302 天前
官方示例有说:
https://natsbyexample.com/examples/messaging/pub-sub/go

There are two circumstances when a published message won’t be delivered to a subscriber:
The subscriber does not have an active connection to the server
...


func TestNats(t *testing.T) {
url := nats.DefaultURL

nc, _ := nats.Connect(url)
defer nc.Drain()

var wg sync.WaitGroup
wg.Add(1)
nc.Subscribe("foo", func(m *nats.Msg) {
defer wg.Done()
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data))
})

nc.Publish("foo", []byte("Hello World"))

wg.Wait()
}
XCFOX
302 天前
我平常主要写 node.js ,为了回复你这个问题特意装了 go 的环境。

你的问题是接受程序(Test_natsio_resp) 从上到下走完直接完事儿了,没有等待下一条消息这个程序就退出了。

你需要想办法让程序等待消息进来,具体代码请参考:
https://github.com/nats-io/go-nats-examples/blob/main/api-examples/subscribe_async/main.go
XCFOX
302 天前
除了添加 WaitGroup 用于等待消息,你还得在发布端使用 defer nc.Close() 而不是 defer nc.Drain(),Drain 状态下是不能发布消息的。
RedBeanIce
301 天前
@kumoocat
@XCFOX

感谢,确实是可以了。。。。

单个文件内解决的代码如 kumoocat 所示
多个文件如下,主题先要启动接收端,再启动发送端。(注意:先启动发送端再启动接收端,消息会丢失。)


```go
发送端

func Test_natsio_request(t *testing.T) {
url := nats.DefaultURL

nc, err := nats.Connect(url)
if err != nil {
log.Fatal(err)
return
}
defer nc.Close()

// Matches all of the above
nc.Publish("foo", []byte("Hello World")) // Use the response

println("==================== request request request")
}
```


```go
接收端

func Test_natsio_resp(t *testing.T) {
url := nats.DefaultURL

nc, err := nats.Connect(url)
if err != nil {
log.Fatal(err)
return
}
defer nc.Drain()

var wg sync.WaitGroup
wg.Add(1)
_, err2 := nc.Subscribe("foo", func(m *nats.Msg) {
defer wg.Done()
fmt.Printf(time.Now().String(), "Msg received on [%s] : %s\n", m.Subject, string(m.Data))
})
if err2 != nil {
log.Fatal(err2)
return
}
wg.Wait()
println("==================== resp ")
}

```
RedBeanIce
301 天前
@XCFOX 但是我不太明白,为什么先启动发送端,再启动接收端,消息就丢了呢。。

容我有空了,去翻一下源码。。。。
liuhan907
301 天前
@RedBeanIce 因为 nats 的消息是发送时没有接收者的话就会被丢弃,不会保存。如果有需求应该用 nats jetstream 。
RedBeanIce
301 天前
@liuhan907 喔!!!原来如此!!

感谢大佬的回复!我现在去试试,jetstream !!!

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1019269

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX