@
lockerhyz 你好,如果你有时间的话,可以帮我看一下吗?以下是工作流程,
消息是 HTTP 请求表单数据
publishMessage.go 发布消息
consumeMessage.go 消费消息
FILE 1 main.go
package main
import (
"
github.com/labstack/echo"
)
type User struct {Name string}
func main() {
e:=echo.New()
r:=e.Group("/api/v1/applicant/register/user")
r.POST("",createUserInNSQ)
e.Start(":1323")
}
FILE 2 publishMessage.go
package main
import (
...
"
github.com/nsqio/go-nsq"
...
)
var tcpNsqdAddr="127.0.0.1:4150"
func publishMessage(topic string,command []uint8) error {
config:=nsq.NewConfig()
producer,_:=nsq.NewProducer(tcpNsqdAddr,config)
producer.Publish(topic,command)
return nil
}
func createUserInNSQ(c echo.Context) (err error) {
u:=new(User)
topic:="InsertUser"
command,err:=json.Marshal(u)
publishMessage(topic,command)
return c.String( http.StatusOK,"OK")
}
FILE 3 consumeMessage.go
package main
import (
...
"
github.com/nsqio/go-nsq"
...
)
var tcpNsqdAddr="127.0.0.1:4150"
type NsqHandler struct {
}
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
//insert message into MySQL
...
return nil
}
func main() {
config:=nsq.NewConfig()
com,_:=nsq.NewConsumer("InsertUser","channel1",config)
com.AddHandler(&NsqHandler{NsqHandlerId:"one"})
com.ConnectToNSQD(tcpNsqdAddr)
var wg=&sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}