Skip to content

nsq消息队列

部署

shell
## 下载安装包
wget https://github.com/nsqio/nsq/releases/download/v1.3.0/nsq-1.3.0.linux-amd64.go1.21.5.tar.gz

## 解压
tar -xvf nsq-1.3.0.linux-amd64.go1.21.5.tar.gz

cd nsq-1.3.0.linux-amd64.go1.21.5

## 启动
./bin/nsqlookupd &
./bin/nsqd --lookupd-tcp-address=127.0.0.1:4160 --mem-queue-size=0 &
./bin/nsqadmin --lookupd-http-address=127.0.0.1:4161 &

## 控制台页面
http://ip:4171/

golang接入

golang生产者

golang
package pub

import (
	"fmt"

	"github.com/nsqio/go-nsq"
)

func Publish() {
	config := nsq.NewConfig()
	producer, err := nsq.NewProducer("10.21.224.120:4150", config)
	if err != nil {
		panic(err)
	}

	err = producer.Ping()
	if err != nil {
		panic(err)
	}

	topicName := "go_queue"
	for i := 0; i < 1000; i++ {
		msg := []byte(fmt.Sprintf("senddata-%d", i))
		err = producer.Publish(topicName, msg)
		if err != nil {
			fmt.Println(err)
		}
		fmt.Println("Message sent: ", string(msg))
	}
}

golang消费者

golang
package sub

import (
	"fmt"
	"log"

	"github.com/nsqio/go-nsq"
)

func Subscribel() {
	// 创建一个NSQ消费者
	consumer, err := nsq.NewConsumer("go_queue", "channel", nsq.NewConfig())
	if err != nil {
		log.Fatal(err)
	}
	// 设置消息处理函数
	consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
		// 处理接收到的消息
		fmt.Println("Received message:", string(message.Body))
		return nil
	}))
	// 连接NSQD
	err = consumer.ConnectToNSQD("10.21.224.120:4150")
	if err != nil {
		log.Fatal(err)
	}
}