Appearance
nsq消息队列
部署
shell
## 下载安装包
wget https://github.com/nsqio/nsq/releases/download/v1.3.0/nsq-1.3.0.linux-amd64.go1.21.5.tar.gz
## 解压
tar -xvf nsq-1.3.0.linux-amd64.go1.21.5.tar.gz
cd nsq-1.3.0.linux-amd64.go1.21.5
## 启动
./bin/nsqlookupd &
./bin/nsqd --lookupd-tcp-address=127.0.0.1:4160 --mem-queue-size=0 &
./bin/nsqadmin --lookupd-http-address=127.0.0.1:4161 &
## 控制台页面
http://ip:4171/golang接入
golang生产者
golang
package pub
import (
"fmt"
"github.com/nsqio/go-nsq"
)
func Publish() {
config := nsq.NewConfig()
producer, err := nsq.NewProducer("10.21.224.120:4150", config)
if err != nil {
panic(err)
}
err = producer.Ping()
if err != nil {
panic(err)
}
topicName := "go_queue"
for i := 0; i < 1000; i++ {
msg := []byte(fmt.Sprintf("senddata-%d", i))
err = producer.Publish(topicName, msg)
if err != nil {
fmt.Println(err)
}
fmt.Println("Message sent: ", string(msg))
}
}golang消费者
golang
package sub
import (
"fmt"
"log"
"github.com/nsqio/go-nsq"
)
func Subscribel() {
// 创建一个NSQ消费者
consumer, err := nsq.NewConsumer("go_queue", "channel", nsq.NewConfig())
if err != nil {
log.Fatal(err)
}
// 设置消息处理函数
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
// 处理接收到的消息
fmt.Println("Received message:", string(message.Body))
return nil
}))
// 连接NSQD
err = consumer.ConnectToNSQD("10.21.224.120:4150")
if err != nil {
log.Fatal(err)
}
}