nsq消息队列
nsq消息队列
NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。
Features 特点
- Distributed NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性。
- Scalable易于扩展 NSQ支持水平扩展,没有中心化的brokers。内置的发现服务简化了在集群中增加节点。同时支持pub-sub和load-balanced 的消息分发。
- Ops Friendly NSQ非常容易配置和部署,生来就绑定了一个管理界面。二进制包没有运行时依赖。官方有Docker image。
- Integrated高度集成 官方的 Go 和 Python库都有提供。而且为大多数语言提供了库。
- 可延时接收
组件
Topic :一个topic就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建topic。
Channels :channel与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。
Messages:消息构成了我们数据流的中坚力量,消费者可以选择结束消息,表明它们正在被正常处理,或者重新将他们排队待到后面再进行处理。每个消息包含传递尝试的次数,当消息传递超过一定的阀值次数时,我们应该放弃这些消息,或者作为额外消息进行处理。
nsqd:nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels,以便大家能找到)。
nsqlookupd:nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。有两个接口:TCP 接口,nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。 nsqadmin:nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。
常用工具类: nsq_to _file:消费指定的话题(topic)/通道(channel),并写到文件中,有选择的滚动和/或压缩文件。
nsq_to _http:消费指定的话题(topic)/通道(channel)和执行 HTTP requests (GET/POST) 到指定的端点。
nsq_to _nsq:消费者指定的话题/通道和重发布消息到目的地 nsqd 通过 TCP。
组件通讯 [文件] nsqd 分发数据 这是官方的图,第一个channel(meteics)因为有多个消费者,所以触发了负载均衡机制。后面两个channel由于没有消费者,所有的message均会被缓存在相应的队列里,直到消费者出现
运行服务
首先启动nsqlookud nsqlookupd
启动nsqd,并接入刚刚启动的nsqlookud。这里为了方便接下来的测试,启动了两个nsqd nsqd –lookupd-tcp-address=127.0.0.1:4160 nsqd –lookupd-tcp-address=127.0.0.1:4160 -tcp-address=0.0.0.0:4152 -http-address=0.0.0.0:4153
启动nqsadmin(不是必须的) nsqadmin –lookupd-http-address=127.0.0.1:4161
go使用nsq
nsq提供了go客户端库 生产者:
var producer *nsq.Producer
func main() {
nsqd := "127.0.0.1:4150"
producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
for i := 0; i < 5; i++ {
producer.Publish("test", []byte("nihao"))
}
if err != nil {
panic(err)
}
}
消费者:
import (
"fmt"
"github.com/nsqio/go-nsq"
"sync"
)
type NSQHandler struct {
id int
}
func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println(this.id,"receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func testNSQ() {
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config:=nsq.NewConfig()
config.MaxInFlight=9
//建立多个连接
for i := 0; i<10; i++ {
consumer, err := nsq.NewConsumer("test", "struggle", config)
if nil != err {
fmt.Println("err", err)
return
}
consumer.AddHandler(&NSQHandler{i})
err = consumer.ConnectToNSQD("127.0.0.1:4150")
if nil != err {
fmt.Println("err", err)
return
}
}
select{}
}()
waiter.Wait()
}
func main() {
testNSQ()
}
总结
事实上,简单性是我们决定使用NSQ的首要因素,这方便与我们的许多其他软件一起维护,通过引入队列使我们得到了堪称完美的表现,通过队列甚至让我们增加了几个数量级的吞吐量。越来越多的consumer需要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能。
NSQ 的 TCP 协议是面向 push 的。另外它是无序的,可能有重复数据,这个要根据实际需求考虑。