求助 go-nsq 消费者的问题

2019-06-04 17:14:41 +08:00
 dt2vba
大佬,下午好,我使用 go-nsq 消息队列处理请求时遇到一个问题。

假设有以下三个文件
main.go publishMessage.go consumeMessage.go

启动服务
go run main.go publishMessage.go
发送请求
curl -d "localhost:1323" -d "MESSAGE"

现在的问题是,我可以发布消息,但是只能启动新的进程去消费消息,go run consumeMessage.go

请问如何在一个进程中处理请求呢?
2245 次点击
所在节点    程序员
8 条回复
lockerhyz
2019-06-04 17:27:25 +08:00
go run main.go publishMessage.go consumeMessage.go
dt2vba
2019-06-04 18:18:42 +08:00
@lockerhyz 谢谢关注。我试了一下,不能正常工作。
dt2vba
2019-06-04 18:55:51 +08:00
@lockerhyz

你好,如果你有时间的话,可以帮我看一下吗?以下是工作流程,

消息是 HTTP 请求表单数据

publishMessage.go 发布消息

consumeMessage.go 消费消息


FILE 1 main.go

package main

import (
"github.com/labstack/echo"
)

type User struct {Name string}

func main() {
e:=echo.New()
r:=e.Group("/api/v1/applicant/register/user")
r.POST("",createUserInNSQ)
e.Start(":1323")
}


FILE 2 publishMessage.go

package main

import (
...
"github.com/nsqio/go-nsq"
...
)

var tcpNsqdAddr="127.0.0.1:4150"

func publishMessage(topic string,command []uint8) error {
config:=nsq.NewConfig()
producer,_:=nsq.NewProducer(tcpNsqdAddr,config)
producer.Publish(topic,command)
return nil
}

func createUserInNSQ(c echo.Context) (err error) {
u:=new(User)

topic:="InsertUser"
command,err:=json.Marshal(u)
publishMessage(topic,command)

return c.String( http.StatusOK,"OK")
}


FILE 3 consumeMessage.go

package main

import (
...
"github.com/nsqio/go-nsq"
...
)

var tcpNsqdAddr="127.0.0.1:4150"

type NsqHandler struct {
}

func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
//insert message into MySQL
...

return nil
}

func main() {
config:=nsq.NewConfig()
com,_:=nsq.NewConsumer("InsertUser","channel1",config)
com.AddHandler(&NsqHandler{NsqHandlerId:"one"})
com.ConnectToNSQD(tcpNsqdAddr)

var wg=&sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
keepeye
2019-06-04 18:58:31 +08:00
学会使用 goroutine
dt2vba
2019-06-04 19:03:28 +08:00
@keepeye 你好,感谢提供帮助!
lockerhyz
2019-06-11 16:08:13 +08:00
@dt2vba
1. 把 consumeMessage.go 的 main 函数修改成其他函数名,例如 consumer()
2. 修改 main.go 中的 main 函数

```
func main() {
go consumer()
e:=echo.New()
r:=e.Group("/api/v1/applicant/register/user")
r.POST("",createUserInNSQ)
e.Start(":1323")
}
```
lockerhyz
2019-06-11 16:09:39 +08:00
@lockerhyz
3. go run main.go publishMessage.go consumeMessage.go
dt2vba
2019-06-12 15:23:28 +08:00
@lockerhyz 非常感谢你的帮助。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/570826

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX