thomaswang
V2EX  ›  问与答

golang nsq 源码哪一段表明 msg 会投递到 Topic 下所有的 Channel

  •  
  •   thomaswang · Sep 25, 2018 · 1466 views
    This topic created in 2815 days ago, the information mentioned may be changed or developed.

    我咋找不到呢

    func (t *Topic) PutMessage(m *Message) error {
    	t.RLock()
    	defer t.RUnlock()
    	if atomic.LoadInt32(&t.exitFlag) == 1 {
    		return errors.New("exiting")
    	}
    	err := t.put(m)
    	if err != nil {
    		return err
    	}
    	atomic.AddUint64(&t.messageCount, 1)
    	return nil
    }
    
    func (t *Topic) put(m *Message) error {
    	select {
    	case t.memoryMsgChan <- m:
    	default:
    		b := bufferPoolGet()
    		err := writeMessageToBackend(b, m, t.backend)
    		bufferPoolPut(b)
    		t.ctx.nsqd.SetHealth(err)
    		if err != nil {
    			t.ctx.nsqd.logf(LOG_ERROR,
    				"TOPIC(%s) ERROR: failed to write message to backend - %s",
    				t.name, err)
    			return err
    		}
    	}
    	return nil
    }
    
    1 replies    2018-09-25 22:11:02 +08:00
    thomaswang
        1
    thomaswang  
    OP
       Sep 25, 2018
    ```go
    func (t *Topic) messagePump() {
    for i, channel := range chans {
    chanMsg := msg
    // copy the message because each channel
    // needs a unique instance but...
    // fastpath to avoid copy if its the first channel
    // (the topic already created the first copy)
    if i > 0 {
    chanMsg = NewMessage(msg.ID, msg.Body)
    chanMsg.Timestamp = msg.Timestamp
    chanMsg.deferred = msg.deferred
    }
    if chanMsg.deferred != 0 {
    channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
    continue
    }
    err := channel.PutMessage(chanMsg)
    if err != nil {
    t.ctx.nsqd.logf(LOG_ERROR,
    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
    t.name, msg.ID, channel.name, err)
    }
    }
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   3072 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 12:54 · PVG 20:54 · LAX 05:54 · JFK 08:54
    ♥ Do have faith in what you're doing.