消息队列

2018-03-04 fishedee 后端

1 概述

消息队列,无论是在单一系统还是在分布式系统,消息队列都是不可或缺的一个重要部分。

2 作用

2.1 解耦

解耦可能是消息队列最常见的模型,它来源于设计模式中的订阅者模式。例如,用户注册成功后,需要对新注册者增加初始积分,对推荐者增加推荐积分。

func RegisterUser(user User){
    this.userDb.Register(user)
    
    this.pointAo.newAdd(user.UserId)
    this.pointAo.recommendAdd(user.RecommendUserId)
}

由于用户模块需要通知积分模块,所以用户模块的逻辑越来越重,它需要一个一个地通知这些额外的模块。更可怕的是,这些事情对于用户模块本身没有什么意义,它只是作为一个通知者而已,有没有通知成功本身并没有影响它自身是否注册用户成功的逻辑。

func Update(userId int,name string){
    this.userDb.Update(userId,name)
    
    this.pointAo.updateUserCache(userId)
    this.dealAo.updateUserCache(userId)
    this.transportAo.updateUserCache(userId)
}

另外,一个场景是,获取用户ID对应的用户名字信息时,积分系统,订单系统,物流系统都有这个需求,为了性能的考虑,他们一般都会缓存用户的名字信息。但是,当用户系统的系统信息修改时,这些依赖它的模块却需要及时的通知。一个暴力的方法是让用户系统去逐个通知它们。这造成了用户系统很烦恼,缓存是你们这些外部系统加的,为什么要我来负责做更新这个事情。

func RegisterUser(user User){
    this.userDb.Register(user)
    this.queue.Pushlish('/user/register',user)
}

this.queue.Subscribe('/user/register',this.pointAo.newAdd)
this.queue.Subscribe('/user/register',this.pointAo.recommendAdd)

队列的作用是将依赖反过来,其他系统提前订阅了这个topic的信息,当这个topic的信息被触发时,由队列来负责通知各个系统。用户系统本身既不需要处理通知的事情,也不需要知道哪个模块需要被通知,这些事情都被队列所处理好了。

2.2 错峰流控

队列的另外一个重要作用是执行异步,和错峰流控

func RegisterUser(user User){
    this.userDb.Register(user)
    
    this.mailAo.send(user.Mail,'注册成功')
}

注册用户后需要发送一封邮件给用户,在没有队列的情况下,由于邮件需要与外部系统连接,一旦外部系统崩溃,就会拖累着注册用户的系统崩溃。我们希望做到的是隔离这两个部分,更新db很快,但是跟外部系统发邮件却很慢,他们不能在一个同步操作中完成。而且,即使是异步操作,发送邮件的并发量也不能太大,避免被对方拉入恶意攻击黑名单,所以也需要流量控制。

func RegisterUser(user User){
    this.userDb.Register(user)
    this.queue.Produce('/mail/send',user.Mail,'注册成功')
}

this.queue.Consume('/mail/send',this.mailAo.send)

队列的作用是,将发送邮件放入到队列中,由队列通知邮箱系统来往外发送邮件。而无论工作量有多庞大,同时最多只有N个邮箱进程在往外并发发邮件,一旦邮件太多忙不过来,这些邮件就会堆积在队列中等候发送。这样做既避免了邮箱系统拖垮了用户系统,又避免了邮箱系统并发量太大,有效地控制了最大流量,避免了峰值流量下雪崩的问题。

2.3 最终一致性事务

微服务的成本,其中的一个莫过于分布式事务了。在以前的单一系统中,要保证数据是强一致性的,相当简单,直接在db层begin,然后commit启动一个事务就可以了,由数据库的ACID特性保证不可能出现数据不一致的情况。但是,在分布式的环境下,我们既要保证简单,又要保证一致性,还要兼顾效率,这真是一个要命的问题。

update t_money set money -= 100 where userId=A
update t_money set money += 100 where userId=B

在最经典的转账问题中,从A转账到B,就是两个语句。在单一系统中,就是begin与commit。在分布式系统中,我们需要使用最终一致性的方法,使用定时器来找出数据不一致的情况,然后不断重试补偿以达到数据最终的一致性。在分布式存储概览中的重试与幂等中,我们曾经谈论过这个问题。

A给B转账的事件插入任务队列,并分配一个全局唯一的任务ID为UUID。

设置UUID下的任务状态为未开始

A用户所在数据库开启事务
A用户增加操作日志,UUID任务下减少100元
A用户金额减少100元
A用户所在数据库提交事务

B用户所在数据库开启事务
B用户增加操作日志,UUID任务下增加100元
B用户金额增加100元
B用户所在数据库提交事务

设置UUID下的任务状态为已完成

当时,我们提出的解决方案就是以上这个方案。这个方案是没有问题的,但是在流程复杂的环境下是无法实现的,因为它是要么不重试,一重试就将所有流程都重试一遍。如果A成功了,只有B失败了,也要将全部流程重新重试一遍。

A给B转账的事件插入任务队列,并分配一个全局唯一的任务ID为UUID。

设置UUID下的任务状态为未开始

A用户所在数据库开启事务
A用户增加操作日志,UUID任务下减少100元
A用户金额减少100元
A用户所在数据库提交事务

设置UUID下的任务状态为已完成一半

B用户所在数据库开启事务
B用户增加操作日志,UUID任务下增加100元
B用户金额增加100元
B用户所在数据库提交事务

设置UUID下的任务状态为已完成

一个很直观的办法就是,在A完成后,再插入一句,将状态扭转为完成了一半。这样重试的时候就不需要整个流程都重试一遍。但这样会带来另外一个问题,由于定时器需要知道每个状态对应的重试开始位置,所以这个重试的定时器逻辑是无法复用的,在每一个需要最终一致性的地方,都需要写一份独特的定时器重试逻辑。

好了,如果我们换个思路,用队列来解决这个问题,方案就会相当优雅漂亮了。UUID的任务状态为队列的topic,每个任务状态对应的重试位置,换为队列的subscriber对于topic的倾听。

A用户所在数据库开启事务
A用户金额减少100元
对queue发送/money/inc的topic
A用户所在数据库提交事务

B用户接收到/money/inc的topic
B用户所在数据库开启事务
B用户金额增加100元
对queue发送/money/finish的topic
B用户所在数据库提交事务

前端接收到/money/finish的topic,然后向用户返回已经转账成功的提示

我们假设数据可能不一致的情况,会发生什么:

  • 如果A步骤失败了,那么queue里面也没有事件,这个转账事件就当没有发生过,数据不会出现不一致的情况。
  • 如果A步骤成功了,但B步骤失败了。那么queue中的/money/inc事件由于没有收到ack回答,就会不断向B服务重试请求,直到B服务返回了ack回答,这个时候数据就会达到最终一致性。
  • 如果A步骤成功了,但B步骤成功了,但发送/money/finish事件失败了。那么queue中的/money/inc事件由于没有收到ack回答,就会不断向B服务重试请求,由于B服务幂等性的原因,B服务很快就返回ack请求,并重新发送了/money/fish的topic了。这种情况下,可能导致多次发送/money/fish的topic。

但是,如果A步骤成功了,但是在发送/money/inc事件时失败了,那么队列中缺少事件就会失去了对B服务发送请求的事情,一样会出现数据不一致的问题,怎么办?

A用户所在数据库开启事务
A用户金额减少100元
本地队列数据库加入/money/inc的topic
A用户所在数据库提交事务

B用户接收到/money/inc的topic
B用户所在数据库开启事务
B用户金额增加100元
本地队列数据库加入/money/finish的topic
B用户所在数据库提交事务

前端接收到/money/finish的topic,然后向用户返回已经转账成功的提示

这里在稍加改动一下,将直接发送数据到队列,改为将发送数据写入本地数据库,由定时器将发送数据来推入队列。这样,就能保证有且仅有一次的队列发送事件了。并且同时实现了多阶段重试,最终一致性事务组件可重用的问题,很是优雅。这里有更加详细的流程图可以参考)

另外,定时器的扫描发送方式,可以进一步优化,改为先发送prepare消息,再发送confirm消息,这样能降低定时器对数据库的扫描压力。详情看这里

2.4 定时器

队列的另外一个作用是作为分布式的定时器。例如,我们有一个任务,需要在每1分钟执行一个任务,这个任务的耗时平均需要2分钟,所以我们部署到2部机器上去处理。一个简单的方式,是让定时器轮询着让这两台机器来执行这个定时任务。但是,如果一旦某个时段,这个任务耗时比较长,下一次轮询时也是轮到它的话,就会造成下一个任务也会被延迟处理了,也就是任务不能做到均衡分配。

引用队列的方式,我们让定时器定时向队列发送一个topic,然后让两台机器竞争地在这个topic中获取消息,谁获取到消息的就去执行定时任务,这样就能很好地实现任务的均衡分配了。而且,看队列的事件堆积情况,我们也能看出定时任务是不是都能及时被处理掉了。

2.5 请求合并

有些时候,我们希望将多个请求合并成一个请求来出来,这样的处理效率要高得多。例如,在踩楼的活动场景中,在关键楼层的新增发帖的用户要多得多。对于每个发帖的处理,我们都需要开启一个事务来处理,这时候多个请求之间需要不断处理并发冲突的问题。

引用队列的方式,我们将所有发帖的请求都直接扔到一个队列中,然后同时只有2个并发在处理这些发帖数据。在关键楼层的时候,这2个并发处理不过来,但队列如果能将相邻的事件一次合并直接发给处理器来处理,那么这些合并事件就不再需要竞争性插入数据了,这大大地提高了插入性能和系统吞吐量。

3 模型

3.1 JMS

3.1.1 约定

JMS协议的想法来自于ui中的发布订阅模式。它支持生产者消费者的queue消费模式,和发布订阅的topic通知模式。

queue消费模式就是producer发布的消息,有且只有一个消费者能够被消费到,而且当消费者未上线时,未能及时消费的消息将会堆积在队列中,当消费者重新上线后会重新推送给消费者。

topic通知模式就是publisher发布的消息,所有的订阅这个topic的订阅者都会接收到这个同一个的消息。但是,跟queue消费模式不同的是,当其中一个订阅者未上线时,这个消息就会被忽略掉,即使订阅者重新上线,这个消息也不会重新推给它。简单来说,topic通知模式仅仅作为消息转发,不会做消息存储。

3.1.2 实现

以类似JMS约定的队列模式实现有:

  • activemq
  • zeromq
  • redis,pub/sub,bpush/brpop

3.1.3 场景

JMS协议的约定是最为直接的队列模式约定,queue消费模式能解决错峰流控,事务消息的问题,但是它的topic通知模式就很鸡肋,当消息队列用作解耦时根本就不希望消息会被丢失,这样会有严重问题的!

另外,这样的实现也有严重的性能问题,因为发布订阅模式是没有消息缓存的,每次收到消息时都必须马上对所有订阅者转发该消息。一旦发布者发出的消息密集时,这些消息就会被直接堆在队列的内存上,直接打爆队列。而且,更大的问题是,每次收到一个topic消息后,需要通知完所有的订阅者,才能继续通知下一个消息,如果其中一个订阅者延迟确认了,那么其他的订阅者就会受到牵连,接收消息的延迟也变大了。(当然,队列可以选择不等所有订阅者确认就发下一个消息,但这很有可能会直接打爆最慢的那个订阅者)

所以,无论是在功能还是在性能上,该实现模式对发布订阅者模式都比较糟糕。唯一的优点是,使用比较简单,只需要一个topic就可以了。

3.2 AMQP

3.2.1 约定

AMQP从模式上统一了生产者消费者,和,发布者订阅者两种模式,它引入了两个概念,exchange,queue。

无论是订阅者还是消费者,他都有自己所属的queue队列,他们在接收事件时,需要指定queue的名字。而发布者和生产者,他们发布的消息时需要指定的不是queue,而是topic。当这些消息到达消息服务器后,由队列的exchanger,将消息转发到topic相关联的所有queue上面就可以了。

这样做有什么改变呢?

生产者和消费者模式下,所有的消费者他们的queue名字都是一样的,一个topic就只对应一个queue,所以运行起来就和JMS的一样,没区别。

发布者和订阅者模式下,不同的订阅者它们是有不同的queue,名字都是不一样的。当然,同一订阅者组的各个订阅者它们的名字是一样的。这样的话,一个topic就是对应多个queue,和JMS一样,发布一个消息后,不同订阅者组都能同时获取到这个消息。

但是,和JMS不同的是,由于不同的订阅者组都有自己独立的queue,所以消息队列就能根据不同的queue缓存起来它们未读的消息。当订阅者组下线后,消息会被缓存起来,等订阅者重新上线后,将消息重新发给它就可以了。而且,也由于不同订阅者组有属于自己独立的queue,队列服务器直接将消息复制到每个队列就可以了,不再需要协调所有的订阅者收到消息后才去发下一个消息。

AMQP中有三种路由方式

fanout就是直接将绑定在exchanger的所有queue都通知

direct就是路由时,根据routing key全匹配queue的routing key时才会转发

topic就是路由时,允许前缀匹配routing key的转发形式

显然,在性能上fanout>direct>>topic

3.2.2 实现

以类似AMQP约定的队列模式实现有:

  • rabbitmq
  • rocketmq
  • nsq

3.2.3 场景

AMQP协议大大增强了队列的发布订阅性能,而且消费者的概念也让功能变得更加灵活强大,对于绝大部份的应用场景来说,这一类的消息队列已经是很足够了,不需要再进一步进化了。稍加一点的麻烦是,订阅时不仅需要指定queueName,而且也需要在消息服务器中预先填写好queueName与topic的映射关系(部分消息队列协议支持动态建立映射关系)

要说唯一的缺点就是,AMQP协议对于海量数据的支持依然不足。为什么?

  • exchanger的转发瓶颈,对于一个topic对应海量queue时,消息会被复制很多次,造成吞吐量低。
  • 同一消费者组下的不同消费者并发竞争瓶颈,对于同一消费者组下的不同消费者,他们都在竞争着获取队列头部的数据,如果消费者很多时,这种竞争就需要考虑分配的公平和效率的问题,造成整体吞吐量急剧下降。
  • 同一个topic下的不同生产者并发竞争瓶颈,与消费者一样,不同的生产者它们都在竞争着往队列尾部添加数据,同样造成吞吐量也下降。

什么场景会有这样的情况?最经典的就是日志分析场景,每台机器都将自己的日志添加到同一个topic中,当机器很多时,amqp协议就会无能为力,根本解决不了。这个海量问题的出现最终由netflix解决了,它提出的方案就是kafka。

3.3 kafka

3.3.1 约定

kafka和amqp一样,有消费者组和消费者的模型,但是没有topic与queue映射的模型,同时加入了分区的模型。

首先,kafka对于每个topic只有一个队列,即使时不同的消费者组也只有一个队列,而且共享一个队列。那么不同的消费者组如何获取属于自己的事件数据呢?答案是,由消费者组自己去传入一个offset,代表读取当前队列的第N个消息,当消费者想读取下一个数据时,就自己将offset+1,然后传入到kafka中获取下一个消息的数据。换句话说,amqp帮你维护你当前读到哪一个消息,而kafka则什么都不管,由客户端自己维护当前应该读到哪一个消息。

这样做的缺点很明显,就是客户端变得麻烦,需要额外一个offset。但是优点也很明显,随时新增一个客户端,然后将offset设置为0,就能实现回放队列所有的历史数据!同时,对于那些处理遇到异常的消息,可以由客户端自己决定是否重试,重试的话需要重试多少次的问题,非常灵活。而且对kafka自身也有好处,服务器自身更加轻量,也不需要做转发器,和消息复制的操作了,吞吐量当然更加大。

除此以外,一个最重要的改变是,引入分区。我们之前谈到amqp由于需要竞争队列头部和尾部的问题,性能急剧下降。kafka的方法就是,将逻辑上的一个队列划分为物理上的多个队列,也就是多个分区。当消费者读取数据时,他就会被预先分配到指定的分区读取数据。例如,如果有4个分区,1个消费者,那么这个消费者就会被分配到4个分区的数据读取任务。如果有4个分区,2个消费者,那么每个消费者就会被分配到2个分区的数据读取任务。换言之,kafka就是要让每个消费者(同一个消费者组)每次只能读取一个分区上的数据,不允许多个消费者(同一个消费者组)读取同一个分区上的数据,这样就能避免消费者读取数据的竞争问题!复杂的地方在于,当消费者动态增加减少时,kafka也要及时维护这种分区关系,分布式维护这个信息的工具就是zookeeper。同样的道理,每个生产者在生产时就被指定了分区的位置就可以了,这样就能避免生产者的竞争问题!完美!

可能你会说,当生产者数量比分区数量多时,难免会产生多个生产者竞争写入到同一个分区的问题。解决方法很简单,让kafka动态扩容,新增一定的空白分区,同时将这些空白分区平均分配给消费者就可以了。

3.3.2 实现

目前使用这种方法实现队列模型的只有一个,kafka。

3.3.3 场景

由于这个方法实在太牛逼了,吞吐量大增,可扩展性达到几乎无限大,容错性非常好。而且,用起来简单,kafka不像amqp会删除读取过的数据,他会将队列数据都保存下来,随时都能回放。

大家刚开始就只是把日志放进去,后来连大数据都放进去了。例如,动态抓取的爬虫数据,用户点击数据,访问数据等等。然后在kafka的另外一端将数据读出来,做流式计算,这样就能实现大数据的实时计算了,慢慢地衍生出kafka的生态系统,形成成熟的kafka streams大数据处理方式。

4 接收方式

对于接收端如何接收方式,目前有三种方案。

4.1 推送

当服务器收到消息后,主动推送给消费者。好处是开发简单,时效性好,但问题是,当消费者来不及处理消息时,这种往死里推消息的方式,很有可能直接打爆消费者。

所以,目前,这种方式出现了变种,当消费者收到消息后,需要返回一个ACK包给服务器。服务器按照当前发送的消息量和ACK量来确定是否需要将消息暂存服务器,还是仍然继续推送消费者。

但是,总体而言,这种方式依然没有很好地解决不同消费者的处理速度和承受能力问题。

这是目前rabbitmq的处理方式

4.2 拉取

服务器不推送消息了,由消费者主动向服务器拉取消息,当消息不存在时,消费者马上返回,然后等候一个时间段以后再继续查询服务器。优点是很好地解决了消费者的处理速度和承受能力的问题,缺点是时效性低,而且消息量小时不断轮询导致瞎忙,浪费资源。

这是之前kafka的处理方式

4.3 长连接

消费者主动发送一个连接给服务器,服务器hold住连接,直到收到消息后返回给消费者。当然,也可以确定一个超时时间,让消费者空等一段时间后先提前返回,实现keeplive。这样做可以说是最好的,既考虑了消费者的处理速度和承受能力,同时时效性不错,也不会导致瞎忙。唯一缺点就是对服务器的开发要求比较高,需要同时hold住大量的长连接。

目前这个方法是业界最优方案,redis,nsq,rocketmq,kafka都是这个方案。

5 总结

这只是消息队列的一个皮毛,还需要继续学习。

参考资料

相关文章