目录

nsq消息队列

nsq消息队列

NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。

Features 特点

  1. Distributed NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性。
  2. Scalable易于扩展 NSQ支持水平扩展,没有中心化的brokers。内置的发现服务简化了在集群中增加节点。同时支持pub-sub和load-balanced 的消息分发。
  3. Ops Friendly NSQ非常容易配置和部署,生来就绑定了一个管理界面。二进制包没有运行时依赖。官方有Docker image。
  4. Integrated高度集成 官方的 Go 和 Python库都有提供。而且为大多数语言提供了库。
  5. 可延时接收

组件

Topic :一个topic就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建topic。

Channels :channel与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。

Messages:消息构成了我们数据流的中坚力量,消费者可以选择结束消息,表明它们正在被正常处理,或者重新将他们排队待到后面再进行处理。每个消息包含传递尝试的次数,当消息传递超过一定的阀值次数时,我们应该放弃这些消息,或者作为额外消息进行处理。

nsqd:nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels,以便大家能找到)。

nsqlookupd:nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。有两个接口:TCP 接口,nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。 nsqadmin:nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。

常用工具类: nsq_to _file:消费指定的话题(topic)/通道(channel),并写到文件中,有选择的滚动和/或压缩文件。

nsq_to _http:消费指定的话题(topic)/通道(channel)和执行 HTTP requests (GET/POST) 到指定的端点。

nsq_to _nsq:消费者指定的话题/通道和重发布消息到目的地 nsqd 通过 TCP。

组件通讯 [文件] ./1548148271499.png nsqd 分发数据 这是官方的图,第一个channel(meteics)因为有多个消费者,所以触发了负载均衡机制。后面两个channel由于没有消费者,所有的message均会被缓存在相应的队列里,直到消费者出现 ./1548150421814.gif

运行服务

首先启动nsqlookud nsqlookupd

启动nsqd,并接入刚刚启动的nsqlookud。这里为了方便接下来的测试,启动了两个nsqd nsqd –lookupd-tcp-address=127.0.0.1:4160 nsqd –lookupd-tcp-address=127.0.0.1:4160 -tcp-address=0.0.0.0:4152 -http-address=0.0.0.0:4153

启动nqsadmin(不是必须的) nsqadmin –lookupd-http-address=127.0.0.1:4161

go使用nsq

nsq提供了go客户端库 生产者:

var producer *nsq.Producer

func main() {
	nsqd := "127.0.0.1:4150"
	producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
	for i := 0; i < 5; i++ {
		producer.Publish("test", []byte("nihao"))
	}
	if err != nil {
		panic(err)
	}
}

消费者:

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"sync"
)

type NSQHandler struct {
	id int
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
	fmt.Println(this.id,"receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}

func testNSQ() {
	waiter := sync.WaitGroup{}
	waiter.Add(1)

	go func() {
		defer waiter.Done()
		config:=nsq.NewConfig()
		config.MaxInFlight=9

		//建立多个连接
		for i := 0; i<10; i++ {
			consumer, err := nsq.NewConsumer("test", "struggle", config)
			if nil != err {
				fmt.Println("err", err)
				return
			}
			consumer.AddHandler(&NSQHandler{i})
			err = consumer.ConnectToNSQD("127.0.0.1:4150")
			if nil != err {
				fmt.Println("err", err)
				return
			}
		}
		select{}

	}()
	waiter.Wait()
}

func main() {
	testNSQ()
}

总结

事实上,简单性是我们决定使用NSQ的首要因素,这方便与我们的许多其他软件一起维护,通过引入队列使我们得到了堪称完美的表现,通过队列甚至让我们增加了几个数量级的吞吐量。越来越多的consumer需要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能。

NSQ 的 TCP 协议是面向 push 的。另外它是无序的,可能有重复数据,这个要根据实际需求考虑。