求助: GoLang MQTT 客户端使用问题

115 天前


2024-08-29 07:13:10	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_5 , error = pingresp not received, disconnecting
2024-08-29 07:13:14	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_88 , error = pingresp not received, disconnecting
2024-08-29 07:13:15	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_43 , error = pingresp not received, disconnecting
2024-08-29 07:13:15	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_72 , error = pingresp not received, disconnecting
2024-08-29 07:13:15	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_1 , error = pingresp not received, disconnecting
2024-08-29 07:13:17	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_37 , error = pingresp not received, disconnecting
2024-08-29 07:13:18	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_10 , error = pingresp not received, disconnecting
2024-08-29 07:14:13	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_52 , error = pingresp not received, disconnecting
2024-08-29 07:14:18	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_59 , error = pingresp not received, disconnecting
2024-08-29 07:14:19	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_84 , error = pingresp not received, disconnecting
2024-08-29 07:14:19	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_54 , error = pingresp not received, disconnecting
2024-08-29 07:14:21	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_22 , error = pingresp not received, disconnecting
2024-08-29 07:14:22	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_12 , error = pingresp not received, disconnecting
2024-08-29 07:14:23	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_25 , error = pingresp not received, disconnecting
2024-08-29 07:14:24	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
2024-08-29 07:14:26	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_36 , error = pingresp not received, disconnecting
2024-08-29 07:15:08	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_63 , error = pingresp not received, disconnecting
2024-08-29 07:15:16	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_23 , error = pingresp not received, disconnecting
2024-08-29 07:15:19	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_96 , error = pingresp not received, disconnecting
2024-08-29 07:15:20	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_50 , error = pingresp not received, disconnecting
2024-08-29 07:15:25	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_75 , error = pingresp not received, disconnecting
2024-08-29 07:15:30	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_78 , error = pingresp not received, disconnecting
2024-08-29 07:15:36	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_7 , error = pingresp not received, disconnecting
2024-08-29 07:15:39	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
2024-08-29 07:16:17	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_79 , error = pingresp not received, disconnecting


package main

import (
	mqtt "github.com/eclipse/paho.mqtt.golang"

// MqttInterface 定义了 MQTT 客户端的基本接口
type MqttInterface struct {
	client mqtt.Client
	Id     string
	Chan   chan []byte
	Config MqttConfig
	wg     sync.WaitGroup

// NewMqttClient 初始化并返回一个新的 MqttInterface 实例
func NewMqttClient(id string, config MqttConfig) *MqttInterface {
	return &MqttInterface{
		Id:     id,
		Chan:   make(chan []byte, 1000),
		Config: config,

// Connect 连接到 MQTT 服务器
func (m *MqttInterface) Connect(host, username, password string, port int) error {
	opts := mqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
	opts.OnConnectionLost = func(client mqtt.Client, err error) {
		zap.S().Errorf("mqtt connection lost id = %s , error = %+v", m.Id, err)
		StopMqttClient(m.Id, m.Config)

	opts.SetKeepAlive(60 * time.Second)
	// 创建并启动客户端
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		return token.Error()

	m.client = client
	return nil

// messageHandler 处理接收到的消息
func (m *MqttInterface) messageHandler(client mqtt.Client, msg mqtt.Message) {


// Subscribe 订阅一个或多个主题
func (m *MqttInterface) Subscribe(topics string) error {
	var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
		defer func() {
			//zap.S().Errorf("mqtt subscribe id = %s , topic = %s", m.Id, msg.Topic())
		mqttMsg := MQTTMessage{
			MQTTClientID: m.Id,
			Message:      string(msg.Payload()),
		jsonData, _ := json.Marshal(mqttMsg)

		m.Chan <- jsonData


	if token.Wait() && token.Error() != nil {
		return token.Error()
	return nil

// Publish 向一个主题发布消息
func (m *MqttInterface) Publish(topic string, payload interface{}) {
	token := m.client.Publish(topic, 0, false, payload)

// Disconnect 断开与 MQTT 服务器的连接
func (m *MqttInterface) Disconnect() {

func (m *MqttInterface) HandlerMsg() {
	for {
		c := <-m.Chan
		PushToQueue("pre_handler", c)


创建 MQTT 客户端和开启订阅

	client := NewMqttClient(clientId,config)
	err := client.Connect(broker, username, password, port)
	if err != nil {
		zap.S().Errorf("mqtt connect err = %v", err)
        return false
	go client.Subscribe(subTopic)
	go client.HandlerMsg()



  1. 我发起了一个 Issues ,我理解是让消息接收后进行异步处理 https://github.com/eclipse/paho.mqtt.golang/issues/686

  2. 修改程序如下

	var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
		go func() {
			mqttMsg := MQTTMessage{
				MQTTClientID: m.Id,
				Message:      string(msg.Payload()),
			jsonData, _ := json.Marshal(mqttMsg)

			m.Chan <- jsonData



1222 次点击
所在节点    程序员
3 条回复
115 天前
115 天前
连 1883 抓包看 日志显示 ping 的回包没收到 断链
115 天前
就是正常来讲不会出现:mqtt connection lost id = TT_79 , error = pingresp not received, disconnecting 问题。 我正在尝试 1883 这个链接 。 我更新了一下 ISSUES


谢谢大家的回复 @ForrestWang @brucemaclin

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。


V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX