首页 手机文章正文

消息队列选型看这一篇就够了

手机 2024年09月22日 12:31 122 admin

作者:emoryliang

消息队列是重要的分布式系统组件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。可用于异步通信、削峰填谷、解耦系统、数据缓存等多种业务场景。本文是关于消息队列(MQ)选型和常见问题的精心整理。在这篇文章中,我们将详细介绍消息队列的概念、作用以及如何选择适合自己需求的消息队列系统。

1概述

消息队列是分布式系统中重要的中间件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。分布式系统可以借助消息队列的能力,轻松实现以下功能:

2架构简介

2.1Kafka

2.1.1系统框架

一个Kafka集群由多个Broker和一个ZooKeeper集群组成,Broker作为Kafka节点的服务器。同一个消息主题Topic可以由多个分区Partition组成,分区物理存储在Broker上。负载均衡考虑,同一个Topic的多个分区存储在多个不同的Broker上,为了提高可靠性,每个分区在不同的Broker会存在副本。

ZookKeeper是一个分布式开源的应用程序协调服务,可以实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。Kafka里的ZooKeeper主要有一下几个作用:

2.1.2基本术语

Producer

:消息生产者。一般情况下,一条消息会被发送到特定的主题上。通常情况下,写入的消息会通过轮询将消息写入各分区。生产者也可以通过设定消息key值将消息写入指定分区。写入分区的数据越均匀Kafka的性能才能更好发挥。

Topic

:Topic是个抽象的虚拟概念,一个集群可以有多个Topic,作为一类消息的标识。一个生产者将消息发送到topic,消费者通过订阅Topic获取分区消息。

Partition

:Partition是个物理概念,一个Topic对应一个或多个Partition。新消息会以追加的方式写入分区里,在同一个Partition里消息是有序的。Kafka通过分区,实现消息的冗余和伸缩性,以及支持物理上的并发读、写,大大提高了吞吐量。

Replicas

:一个Partition有多个Replicas副本。这些副本保存在broker,每个broker存储着成百上千个不同主题和分区的副本,存储的内容分为两种:master副本,每个Partition都有一个master副本,所有内容的写入和消费都会经过master副本;follower副本不处理任何客户端的请求,只同步master的内容进行复制。如果master发生了异常,很快会有一个follower成为新的master。

Consumer

:消息读取者。消费者订阅主题,并按照一定顺序读取消息。Kafka保证每个分区只能被一个消费者使用。

Offset

:偏移量是一种元数据,是不断递增的整数。在消息写入时Kafka会把它添加到消息里。在分区内偏移量是唯一的。消费过程中,会将最后读取的偏移量存储在Kafka中,消费者关闭偏移量不会丢失,重启会继续从上次位置开始消费。

2.2Pulsar

2.2.1系统框架

Pulsar有三个重要的组件,Broker、BookKeeper和ZooKeeper,Broker是无状态服务,客户端需要连接到Broker上进行消息的传递。BookKeeper与ZooKeeper是有状态服务。BookKeeper的节点叫Bookie,负责存储消息和游标,ZooKeeper存储Broker和Bookie的元数据。Pulsar以这种架构,实现存储和计算分离,Broker负责计算,Bookie负责有状态存储。

Pulsar的多层架构影响了存储数据的方式。Pulsar将Topic分区划分为分片(Segment),然后将这些分片存储在ApacheBookKeeper的存储节点上,以提高性能、可伸缩性和可用性。Pulsar的分布式日志以分片为中心,借助扩展日志存储(通过ApacheBookKeeper)实现,内置分层存储支持,因此分片可以均匀地分布在存储节点上。由于与任一给定Topic相关的数据都不会与特定存储节点进行捆绑,因此很容易替换存储节点或缩扩容。另外,集群中最小或最慢的节点也不会成为存储或带宽的短板。

2.2.2基本术语

Property

:代表租户,每个property都可以代表一个团队、一个功能、一个产品线。一个property可包含多个namesapce,多租户是一种资源隔离手段,可以提高资源利用率;

Namespace

:Pulsar的基本管理单元,在namaspace级别可设置权限、消息TTL、Retention策略等。一个namaspace里的所有topic都继承相同的设置。命名空间分为两种:本地命名空间,只在集群内可见、全局命名空间对多个集群可见集群命名空间;

:数据生产方,负责创建消息并将消息投递到Pulsar中;

:数据消费方,连接到Pulsar接收消息并进行相应的处理;

Broker

:无状态Proxy服务,负责接收消息、传递消息、集群负载均衡等操作,它对client屏蔽了服务端读写流程的复杂性,是保证数据一致性与数据负载均衡的重要角色。Broker不会持久化保存元数据。可以扩容但不能缩容;

BookKeeper

:有状态,负责持久化存储消息。当集群扩容时,Pulsar会在新增BookKeeper和Segment(即Bookeeper的Ledger),不需要像kafka一样在扩容时进行Rebalance。扩容结果是Fragments跨多个Bookies以带状分布,同一个Ledger的Fragments分布在多个Bookie上,导致读取和写入会在多个Bookies之间跳跃;

ZooKeeper

:存储Pulsar、BookKeeper的元数据,集群配置等信息,负责集群间的协调、服务发现等;

:用作从producer到consumer传输消息。Pulsar在Topic级别拥有一个leaderBroker,称之为拥有Topic的所有权,针对该Topic所有的R/W都经过该Broker完成。Topic的Ledger和Fragment之间映射关系等元数据存储在Zookeeper中,PulsarBroker需要实时跟踪这些关系进行读写流程;

Ledger

:即Segment,Pulsar底层数据以Ledger的形式存储在BookKeeper上。是Pulsar删除的最小单位;

Fragment

:每个Ledger由若干Fragment组成。

2.3RocketMQ

2.3.1系统框架

RocketMQ是阿里开源的消息中间件,它是一个开源的分布式消息传递和流式数据平台。总共有四大部分:NameServer,Broker,Producer,Consumer

NameServer主要用来管理brokers以及路由信息。broker服务器启动时会注册到NameServer上,并且两者之间保持心跳监测机制,以此来保证NameServer知道broker的存活状态。而且,每一台NameServer都存有全部的broker集群信息和生产者/消费者客户端的请求信息。

Broker负责管理消息存储分发,主从数据同步,为消息建立索引,提供消息查询等能力。

2.3.2基本术语

:一个Topic可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的Topic发送消息。一个Topic也可以被0个、1个、多个消费者订阅;

Tag

:消息二级类型,可以为用户提供额外的灵活度,一条消息可以没有tag;

:消息生产者;

:存储消息,以Topic为纬度轻量级的队列;转发消息,单个Broker节点与所有的NameServer节点保持长连接及心跳,会定时将Topic信息注册到NameServer;

:消息消费者,负责接收并消费消息;

MessageQueue

:消息的物理管理单位,一个Topic可以有多个Queue,Queue的引入实现了水平扩展的能力;

NameServer

:负责对原数据的管理,包括Topic和路由信息,每个NameServer之间是没有通信的;

Group

:一个组可以订阅多个Topic,ProducerGroup、ConsumerGroup分别是一类生产者和一类消费者;

:通过Offset访问存储单元,RocketMQ中所有消息都是持久化的,且存储单元定长。Offset为JavaLong类型,理论上100年内不会溢出,所以认为MessageQueue是无限长的数据,Offset是下标;

:支持PUSH和PULL两种消费模式,支持集群消费和广播消费。

2.4RabbitMQ

2.4.1系统框架

RabbitMQ基于AMQP协议来实现,主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收。

2.4.2基本术语

:接收客户端链接实体,实现AMQP消息队列和路由功能;

VirtualHost

:是一个虚拟概念,权限控制的最小单位。一个VirtualHost里包含多个Exchange和Queue;

Exchange

:接收消息生产者的消息并将消息转发到队列。发送消息时根据不同ExchangeType的决定路由规则,ExchangeType常用的有:direct、fanout和topic三种;

:消息队列,存储为被消费的消息;

Message

:由Header和Body组成,Header是生产者添加的各种属性,包含Message是否持久化、哪个MessageQueue接收、优先级。Body是具体的消息内容;

Binding

:Binding连接起了Exchange和MessageQueue。在服务器运行时,会生成一张路由表,这张路由表上记录着MessageQueue的条件和BindingKey值。当Exchange收到消息后,会解析消息中的Header得到BindingKey,并根据路由表和ExchangeType将消息发送到对应的MessageQueue。最终的匹配模式是由ExchangeType决定;

Connection

:在Broker和客户端之间的TCP连接;

Channel

:信道。Broker和客户端只有tcp连接是不能发送消息的,必须创建信道。AMQP协议规定只有通过Channel才能执行AMQP命令。一个Connection可以包含多个Channel。之所以需要建立Channel,是因为每个TCP连接都是很宝贵的。如果每个客户端、每个线程都需要和Broker交互,都需要维护一个TCP连接的话是机器耗费资源的,一般建议共享Connection。RabbitMQ不建议客户端线程之前共享Channel,至少保证同一Channel发小消息是穿行的;

Command

:AMQP命令,客户端通过Command来完成和AMQP服务器的交互。

2.5NSQ

2.5.1系统框架

NSQ主要有nsqlookup、nsqd两部分组成:

NSQ由3个守护进程组成:

3选型要点

3.1选型参考

3.2消息队列对比

注:作为LShift和CohesiveFT于2007年成立的合资企业,RabbitMQ于2010年4月被VMware旗下的SpringSource收购。

4功能剖析

4.1消费推拉模式

客户端消费者获取消息的方式,Kafka和RocketMQ是通过长轮询Pull的方式拉取消息,RabbitMQ、Pulsar、NSQ都是通过Push的方式。

pull类型的消息队列更适合高吞吐量的场景,允许消费者自己进行流量控制,根据消费者实际的消费能力去获取消息。而push类型的消息队列,实时性更好,但需要有一套良好的流控策略(backpressure)当消费者消费能力不足时,减少push的消费数量,避免压垮消费端。

4.2延迟队列

消息延迟投递,当消息产生送达消息队列时,有些业务场景并不希望消费者立刻收到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列一般分为两种,基于消息的延迟和基于队列的延迟。基于消息的延迟指为每条消息设置不同的延迟时间,当队列有新消息进入的时候根据延迟时间排序,当然这样会对性能造成较大影响。另一种基于队列的延迟指的是设置不同延迟级别的队列,队列中每个消息的延迟时间都是相同的,这样免去了基于延迟时间排序对性能带来的损耗,通过一定的扫描策略即可投递超时的消息。

延迟消息的使用场景比如异常检测重试,订单超时取消等,例如:

Kafka不支持延迟消息。Pulsar支持秒级的延迟消息,所有延迟投递的消息会被DelayedMessageTracker记录对应的index,consumer在消费时,会先去DelayedMessageTracker检查,是否有到期需要投递的消息,如果有到期的消息,则从Tracker中拿出对应的index,找到对应的消息进行消费,如果没有到期的消息,则直接消费正常的消息。对于长时间的延迟消息,会被存储在磁盘中,当快到延迟间隔时才被加载到内存里。

RocketMQ开源版本延迟消息临时存储在一个内部主题中,不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。

RabbitMQ需要安装一个rabbitmq_delayed_message_exchange插件。

NSQ通过内存中的优先级队列来保存延迟消息,支持秒级精度,最多2个小时延迟。

4.3死信队列

由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。与此对应的还有一个“回退队列”的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack),进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。

Kafka没有死信队列,通过Offset的方式记录当前消费的偏移量。

Pulsar有重试机制,当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试Topic中,当重试达到一定次数后,停止重试,投递到死信Topic中。

RocketMQ通过DLQ来记录所有消费失败的消息。

RabbitMQ是利用类似于延迟队列的形式实现死信队列。

NSQ没有死信队列。

4.4优先级队列

有一些业务场景下,我们需要优先处理一些消息,比如银行里面的金卡客户、银卡客户优先级高于普通客户,他们的业务需要优先处理。如下图:

优先级队列不同于先进先出队列,优先级高的消息具备优先被消费的特权,这样可以为下游提供不同消息级别的保证。不过这个优先级也是需要有一个前提的:如果消费者的消费速度大于生产者的速度,并且消息中间件服务器(一般简单的称之为Broker)中没有消息堆积,那么对于发送的消息设置优先级也就没有什么实质性的意义了,因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

Kafka、RocketMQ、Pulsar、NSQ不支持优先级队列,可以通过不同的队列来实现消息优先级。

RabbitMQ支持优先级消息。

4.5消息回溯

一般消息在消费完成之后就被处理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息进而查出问题的源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务补偿方案也可以采用回溯的方式来实现。

Kafka支持消息回溯,可以根据时间戳或指定Offset,重置Consumer的Offset使其可以重复消费。

Pulsar支持按时间对消息进行回溯。

RocketMQ支持按时间回溯,实现的原理跟Kafka一致。

RabbitMQ不支持回溯,消息一旦标记确认就会被标记删除。

NSQ一般消息是不可回溯的,但可以通过nsq_to_file工具,将消息写入到文件,然后从文件里重放消息。

4.6消息持久化

流量削峰是消息中间件的一个非常重要的功能,而这个功能其实得益于其消息堆积能力。从某种意义上来讲,如果一个消息中间件不具备消息堆积的能力,那么就不能把它看做是一个合格的消息中间件。消息堆积分内存式堆积和磁盘式堆积。一般来说,磁盘的容量会比内存的容量要大得多,对于磁盘式的堆积其堆积能力就是整个磁盘的大小。从另外一个角度讲,消息堆积也为消息中间件提供了冗余存储的功能。

Kafka和RocketMQ直接将消息刷入磁盘文件中进行持久化,所有的消息都存储在磁盘中。只要磁盘容量够,可以做到无限消息堆积。

RabbitMQ是典型的内存式堆积,但这并非绝对,在某些条件触发后会有换页动作来将内存中的消息换页到磁盘(换页动作会影响吞吐),或者直接使用惰性队列来将消息直接持久化至磁盘中。

Pulsar消息是存储在BookKeeper存储集群上,也是磁盘文件。

NSQ通过nsq_to_file工具,将消息写入到文件。

4.7消息确认机制

消息队列需要管理消费进度,确认消费者是否成功处理消息,使用push的方式的消息队列组件往往是对单条消息进行确认,对于未确认的消息,进行延迟重新投递或者进入死信队列。

Kafka

通过Offset的方式确认消息。

1)发送方确认机制ack=0,不管消息是否成功写入分区ack=1,消息成功写入首领分区后,返回成功ack=all,消息成功写入所有分区后,返回成功。

2)接收方确认机制自动或者手动提交分区偏移量,早期版本的kafka偏移量是提交给Zookeeper的,这样使得zookeeper的压力比较大,更新版本的kafka的偏移量是提交给kafka服务器的,不再依赖于zookeeper群组,集群的性能更加稳定。

RocketMQ

与Kafka类似也会提交Offset,区别在于消费者对于消费失败的消息,可以标记为消息消费失败,Broker会重试投递,如果累计多次消费失败,会投递到死信队列。

RabbitMQ

和NSQ类似,消费者确认单条消息,否则会重新放回队列中等待下次投递。

1)发送方确认机制,消息被投递到所有匹配的队列后,返回成功。如果消息和队列是可持久化的,那么在写入磁盘后,返回成功。支持批量确认和异步确认。

2)接收方确认机制,设置autoAck为false,需要显式确认,设置autoAck为true,自动确认。当autoAck为false的时候,rabbitmq队列会分成两部分,一部分是等待投递给consumer的消息,一部分是已经投递但是没收到确认的消息。如果一直没有收到确认信号,并且consumer已经断开连接,rabbitmq会安排这个消息重新进入队列,投递给原来的消费者或者下一个消费者。未确认的消息不会有过期时间,如果一直没有确认,并且没有断开连接,rabbitmq会一直等待,rabbitmq允许一条消息处理的时间可以很久很久。

Pulsar

使用专门的Cursor管理。累积确认和Kafka效果一样;提供单条或选择性确认。

4.8消息TTL

消息TTL表示一条消息的生存时间,如果消息发出来后,在TTL的时间内没有消费者进行消费,消息队列会将消息删除或者放入死信队列中。

Kafka根据设置的保留期来删除消息。有可能消息没被消费,过期后被删除。不支持TTL。

Pulsar支持TTL,如果消息未在配置的TTL时间段内被任何消费者使用,则消息将自动标记为已确认。消息保留期与消息TTL之间的区别在于:消息保留期作用于标记为已确认并设置为已删除的消息,而TTL作用于未ack的消息。上面的图例中说明了Pulsar中的TTL。例如,如果订阅B没有活动消费者,则在配置的TTL时间段过后,消息M10将自动标记为已确认,即使没有消费者实际读取该消息。

RocketMQ提及到消息TTL的资料比较少,不过看接口似乎是支持的。

RabbitMQ有两种方式,一个是声明队列的时候在队列属性中设置,整个队列中的消息都有相同的有效期。还可以发送消息的时候给消息设置属性,可以位每条消息都设置不同的TTL。

NSQ似乎还没支持,有一个FeatureRequest的Issue处于Open状态。

4.9多租户隔离

多租户是指通过一个软件实例为多个租户提供服务的能力。租户是指对系统有着相同“视图”的一组用户。不支持多租户的系统里边,往往要为不同用户或者不同集群创建多个消息队列实例实现物理隔离,这样会带来较高的运维成本。作为一种企业级的消息系统,Pulsar的多租户能力按照设计可满足下列需求:

Pulsar通过下列方式满足了上述需求:

以策略的方式定义所有隔离机制,策略可在运行过程中更改,借此降低运维成本并简化管理工作。

4.10消息顺序性

消息顺序性是指保证消息有序。消息消费顺序跟生产的顺序保持一致。

Kafka保证了分区内的消息有序。

Pulsar支持两种消费模式,独占订阅的流模式只保证了消息的顺序性,共享订阅队列模型不保证有序性。

RocketMQ需要用到锁来保证一个队列同时只有一个消费者线程进行消费,保证消息的有序性。

RabbitMQ顺序性的条件比较苛刻,需要单线程发送、单线程消费,并且不采用延迟队列、优先级队列等高级功能。

NSQ是利用了golang自身的case/select实现的消息分发,本身不提供有序性保障,不能够把特性消息和消费者对应起来,无法实现消息的有序性。

4.11消息查询

在实际开发中,经常要查看MQ中消息的内容,比如通过某个MessageKey/ID,查询到MQ的具体消息。或者是对消息进行链路追踪,知道消息从哪里来,发送到哪里去,进而快速对问题进行排查定位。

Kafka存储层是以分布式提交日志的形式实现,每次写操作都顺序追加到日志的末尾。读也是顺序读。不支持检索功能。

Pulsar可以通过消息ID,查询到具体某条消息的消息内容、消息参数和消息轨迹。

RocketMQ支持按MessageKey、UniqueKey、MessageId对消息进行查询。

RabbitMQ使用基于索引的存储系统。这些将数据保存在树结构中,以提供确认单个消息所需的快速访问。由于RabbitMQ的消息在确认后会被删除,因此只能查询未确认的消息。

NSQ自身不支持消息持久化和消息检索,不过可以使用nsq_to_http等工具将消息写入可支持索引的存储里。

4.12消费模式

Kafka有两种消费模式,最终都会保证一个分区只有1个消费者在消费:

Pulsar有以下四种消费模式,其中独占模式和灾备模式跟Kafka类似,为流模型,每个分区只有1个消费者消费,能保证消息有序性。共享模式和Key共享模式为队列模型,多个消费者能提高消费速度,但不能保证有序性。

RocketMQ有两种消费模式,BROADCASTING广播模式,CLUSTERING集群模式。

广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。

集群消费模式:一个ConsumerGroup中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个ConsumerGroup有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费。

RabbitMQ和NSQ的消费比较类似,都是跟Pulsar共享模式类似的,队列的形式,增加一个消费者组里的消费者数量能提高消费速度。

4.13消息可靠性

消息丢失是使用消息中间件时所不得不面对的一个同点,其背后消息可靠性也是衡量消息中间件好坏的一个关键因素。尤其是在金融支付领域,消息可靠性尤为重要。比如当服务出现故障时,一些对于生产者来说已经生产成功的消息,是否会在高可用切换时丢失。同步刷盘是增强一个组件可靠性的有效方式,消息中间件也不例外,Kafka和RabbitMQ都可以支持同步刷盘,但绝大多数情景下,一个组件的可靠性不应该由同步刷盘这种极其损耗性能的操作来保障,而是采用多副本的机制来保证。

Pulsar有跟Kafka类似的概念,叫AckQuorumSize(Qa),Qa是每次写请求发送完毕后需要回复确认的Bookie的个数,其数值越大则需要确认写成功的时间越长,其值上限是副本数Qw。为了一致性,Qa应该是:(Qw 1)/2或者更,即为了确保数据安全性,Qa下限是(Qw 1)/2。

RocketMQ与Kafka类似。

RabbitMQ是主从架构,通过镜像环形队列实现多副本及强一致性语义的。多副本可以保证在master节点宕机异常之后可以提升slave作为新的master而继续提供服务来保障可用性。

NSQ会通过go-diskqueue组件将消息落盘到本地文件中,通过mem-queue-size参数控制内存中队列大小,如果mem-queue-size=0每条消息都会存储到磁盘里,不用担心节点重启引起的消息丢失。但由于是存储在本地磁盘中,如果节点离线,堆积在节点磁盘里的消息会丢失。

4.14负载均衡

:支持负载均衡。一个broker通常就是一台服务器节点。对于同一个Topic的不同分区,Kafka会尽力将这些分区分布到不同的Broker服务器上,zookeeper保存了broker、主题和分区的元数据信息。分区首领会处理来自客户端的生产请求,kafka分区首领会被分配到不同的broker服务器上,让不同的broker服务器共同分担任务。

每一个broker都缓存了元数据信息,客户端可以从任意一个broker获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。

kafka的消费者组订阅同一个topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。

当消费者加入或者退出消费者组的时候,还会触发再均衡,为每一个消费者重新分配分区,分摊负载。

kafka的负载均衡大部分是自动完成的,分区的创建也是kafka完成的,隐藏了很多细节,避免了繁琐的配置和人为疏忽造成的负载问题。

发送端由topic和key来决定消息发往哪个分区,如果key为null,那么会使用轮询算法将消息均衡地发送到同一个topic的不同分区中。如果key不为null,那么会根据key的hashcode取模计算出要发往的分区。

rabbitmq

:对负载均衡的支持不好。消息被投递到哪个队列是由交换器和key决定的,交换器、路由键、队列都需要手动创建。

rabbitmq客户端发送消息要和broker建立连接,需要事先知道broker上有哪些交换器,有哪些队列。通常要声明要发送的目标队列,如果没有目标队列,会在broker上创建一个队列,如果有,就什么都不处理,接着往这个队列发送消息。假设大部分繁重任务的队列都创建在同一个broker上,那么这个broker的负载就会过大。(可以在上线前预先创建队列,无需声明要发送的队列,但是发送时不会尝试创建队列,可能出现找不到队列的问题,rabbitmq的备份交换器会把找不到队列的消息保存到一个专门的队列中,以便以后查询使用)

使用镜像队列机制建立rabbitmq集群可以解决这个问题,形成master-slave的架构,master节点会均匀分布在不同的服务器上,让每一台服务器分摊负载。slave节点只是负责转发,在master失效时会选择加入时间最长的slave成为master。

当新节点加入镜像队列的时候,队列中的消息不会同步到新的slave中,除非调用同步命令,但是调用命令后,队列会阻塞,不能在生产环境中调用同步命令。

当rabbitmq队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。

这种方式非常适合扩展,而且是专门为并发程序设计的。

如果某些消费者的任务比较繁重,那么可以设置basicQos限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,rabbitmq不再向这个消费者发送任何消息。

对于rabbitmq而言,客户端与集群建立的TCP连接不是与集群中所有的节点建立连接,而是挑选其中一个节点建立连接。

但是rabbitmq集群可以借助HAProxy、LVS技术,或者在客户端使用算法实现负载均衡,引入负载均衡之后,各个客户端的连接可以分摊到集群的各个节点之中。

客户端均衡算法:

zeromq

:去中心化,不支持负载均衡。本身只是一个多线程网络库。

rocketmq

:支持负载均衡。一个broker通常是一个服务器节点,broker分为master和slave,master和slave存储的数据一样,slave从master同步数据。

nameserver与每个集群成员保持心跳,保存着Topic-Broker路由信息,同一个topic的队列会分布在不同的服务器上。

发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。

tags选填,类似于Gmail为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个tag,所以也可以类比为Notify的MessageType概念。

keys选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。

rocketmq的负载均衡策略规定:Consumer数量应该小于等于Queue数量,如果Consumer超过Queue数量,那么多余的Consumer将不能消费消息。这一点和kafka是一致的,rocketmq会尽可能地为每一个Consumer分配相同数量的队列,分摊负载。

activemq

:支持负载均衡。可以基于zookeeper实现负载均衡。

4.15集群方式

:天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave。分区首领均匀地分布在不同的kafka服务器上,分区副本也均匀地分布在不同的kafka服务器上,所以每一台kafka服务器既含有分区首领,同时又含有分区副本,每一台kafka服务器是某一台kafka服务器的Slave,同时也是某一台kafka服务器的leader。

kafka的集群依赖于zookeeper,zookeeper支持热扩展,所有的broker、消费者、分区都可以动态加入移除,而无需关闭服务,与不依靠zookeeper集群的mq相比,这是最大的优势。

:支持简单集群,'复制'模式,对高级集群模式支持不好。

rabbitmq的每一个节点,不管是单一节点系统或者是集群中的一部分,要么是内存节点,要么是磁盘节点,集群中至少要有一个是磁盘节点。

在rabbitmq集群中创建队列,集群只会在单个节点创建队列进程和完整的队列信息(元数据、状态、内容),而不是在所有节点上创建。

引入镜像队列,可以避免单点故障,确保服务的可用性,但是需要人为地为某些重要的队列配置镜像。

:去中心化,不支持集群。

:常用多对'Master-Slave'模式,开源版本需手动切换Slave变成Master

NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

客户端先找到NameServer,然后通过NameServer再找到Broker。

一个topic有多个队列,这些队列会均匀地分布在不同的broker服务器上。rocketmq队列的概念和kafka的分区概念是基本一致的,kafka同一个topic的分区尽可能地分布在不同的broker上,分区副本也会分布在不同的broker上。

rocketmq集群的slave会从master拉取数据备份,master分布在不同的broker上。

:支持简单集群模式,比如'主-备',对高级集群模式支持不好。

5性能

Kafka的公司Confluent在2020年8月发了一篇BenchmarkingApacheKafka,ApachePulsar,andRabbitMQ:WhichistheFastest?文章,并且提出了一个开源的MQBenchmark框架THEOPENMESSAGINGBENCHMARKFRAMEWORK,在这个文档里,对比了Kafka、Pulsar、RabbitMQ的吞吐量、端到端延迟等性能数据。最后得出结论Kafka相对来说性能最好。

但接下来StreamNative在2020年12月指出了Confluence的基准测试的一些问题,并对Pulsar进行了参数调优之后重新执行了一遍结果,测试报告展示Pulsar能达到跟Kafka同样的吞吐量,在某些场景下,Pulsar的延迟显著低于Kafka。

而且在性能测试上,有很多客户端、服务端参数设置、机器性能配置等影响,比如消息可靠性级别,压缩算法等,很难做到“完全”控制变量公平的测试。而且OpenMessagingBenchmark的开源Github的Readme上也提到了。

不过有几个关注点:

Pulsar和Kafka都被广泛用于各个企业,也各有优势,都能通过数量基本相同的硬件处理大流量。部分用户误以为Pulsar使用了很多组件,因此需要很多服务器来实现与Kafka相匹敌的性能。这种想法适用于一些特定硬件配置,但在多数资源配置相同的情况中,Pulsar的优势更加明显,可以用相同的资源实现更好的性能。举例来说,Splunk最近分享了他们选择Pulsar放弃Kafka的原因,其中提到“由于分层架构,Pulsar帮助他们将成本降低了30%-50%,延迟降低了80%-98%,运营成本降低了33%-50%”。Splunk团队发现Pulsar可以更好地利用磁盘IO,降低CPU利用率,同时更好地控制内存。

在分布式系统里,单机性能指标虽然也很重要,分布式系统整体的性能以及灵活扩缩容、高可用容灾等能力也会是评估的一个重要参考。MQ中间件具体的性能指标,也需要我们自己根据实际的情况,根据实际购买的集群配置和客户端参数,进行压测调优来评估。

6运维

在使用过程中难免会出现各种异常情况,比如宕机、网络抖动、扩容等。消息队列具备异地容灾,高可用架构等能力,能避免一些计算节点、网络等基础设施不可用导致的故障。

6.1高可用

Kafka通过分区多副本的方式解决高可用问题。

Pulsar的计算集群Broker是无状态的,可以灵活扩缩容,存储节点Bookie上通过消息分区分片副本的方式,每个分片都有一个或多个副本,保证在某一个Bookie挂掉后,有其他分片可以提供服务。

RocketMQ和RabbitMQ都是主从架构,当master挂掉后,由原来的从节点继续提供服务。备机提供消费服务,保证消息不丢,但不提供写服务。

NSQ是类似分布式架构,不过由于消息存储是在节点本地磁盘上,如果一个节点离线,堆积在节点磁盘上的消息会丢失。

6.2跨地域容灾

Pulsar原生支持跨地域容灾功能,在这个图中,每当P1、P2和P3的生产者分别向Cluster-A、Cluster-B和Cluster-C中的T1topic发送消息时,这些消息很快在不同的集群中复制。一旦消息完成复制,消费者C1和C2会从各自的集群消费到这个消息。

在这个跨地域容灾的设计支撑下,其一,我们可以比较容易的将服务分散到多个机房;其二,可以应对机房级别的故障,即在一个机房不可用的情况下,服务可以转接到其它的机房来继续对外提供服务。

一句话概括,Pulsar的跨地域复制,其实就是在一个本地集群中创建一个Producer,把异地的集群作为这个Producer的发送地址,将本地集群的消息发送过去,并且在本地维护一个Cusor来保证消息可靠性和幂等性。

6.3集群扩容

当消息量突然上涨,消息队列集群到达瓶颈的时候,需要对集群进行扩容,扩容一般分为水平扩容和垂直扩容两种方式,水平扩容指的是往往集群中增加节点,垂直扩容指的是把集群中部分节点的配置调高,增加处理能力。

Kafka集群由于主题分区是物理存储在Broker节点上的,新加入的集群的节点并没有存储分区分片,也就无法提供马上提供服务,因此需要把一些Topic的分区分配到新加入的节点里,这里会涉及到一个分区数据均衡的过程,将某些分区的数据复制到新节点上。这个过程跟分区当前堆积的数据量、Broker性能有关,有可能会出现由于源Broker负载过高,堆积数据过大,导致数据均衡的时间变长。

Pulsar的无限分布式日志以分片为中心,借助扩展日志存储(通过ApacheBookKeeper)实现,内置分层存储支持,因此分片可以均匀地分布在存储节点上。由于与任一给定topic相关的数据都不会与特定存储节点进行捆绑,因此很容易替换存储节点或缩扩容。另外,集群中最小或最慢的节点也不会成为存储或带宽的短板。

RocketMQ新节点直接加入到集群中,在新的broker创建新topic并且分配队列,或者在已有topic基础上分配队列。与Kafka的区别是,Kafka的分区是在不同的物理机器上,而Rocketmq是逻辑分区,用的队列形式,因此不存在出现数据不均衡的情况。

RabbitMQ和NSQ类似,由于不涉及过多的消息持久化,直接往集群中增加节点。

6.4使用成本

Kafka/Pulsar/RocketMQ/RabbitMQ在腾讯云上都上线了标准产品,可以直接购买创建实例(),能大大降低部署运维成本。而NSQ目前暂时还没有上线,需要自行部署。

7常见问题&使用场景

7.1Kafka

7.2RocketMQ

7.3普通消息

消息队列最基础的功能就是生产者发送消息、Broker保存消息,消费者来消费消息,以此实现系统解耦、削峰填谷的作用。

普通消息是消息队列必备的消息类型,也是系统使用场景最多的一种消息。

7.4顺序消息

顺序消息是指生产者发送消息的顺序和消费者消费消息的顺序是一致的。比如在一个电商场景,同一个用户提交订单、订单支付、订单出库,这三个消息消费者需要按照顺序来进行消费。如下图:

顺序消息的实现并不容易,原因如下:

要保证消息有序,需要满足两个条件:

如下图:

上面第二个条件是比较容易实现的,一个分区绑定一个消费者就可以,主要是第一个条件。

在主流消息队列的实现中,Kafka和Pulsar的实现方式类似,生产者给消息赋值一个key,对key做Hash运算来指定消息发送到哪一个分区。比如上面电商的例子,对同一个用户的一笔订单,提交订单、订单支付、订单出库这三个消息赋值同一个key,就可以把这三条消息发送到同一个分区。

对于RocketMQ,生产者在发送消息的时候,可以通过MessageQueueSelector指定把消息投递到那个MessageQueue,如下图:

7.5延时消息

或者也叫定时消息,是指消息发送后不会立即被消费,而是指定一个时间,到时间后再消费。经典的场景比如电商购物时,30分钟未支付订单,让订单自动失效。

7.5.1RocketMQ实现

RocketMQ定义了18个延时级别,每个延时级别对应一个延时时间。下面如果延迟级别是3,则消息会延迟10s才会拉取。

RocketMQ的延时消息如下图:

生产者把消费发送到Broker后,Broker首先把消息保存到SCHEDULE_TOPIC_XXXX这个Topic,然后调度任务会判断是否到期,如果到期,会把消息从SCHEDULE_TOPIC_XXXX取出投递到原始的queue,这样消费者就可以消费到了。

RocketMQ的延时消息只支持最大两个小时的延时,不过RocketMQ5.0基于时间轮算法实现了定时消息,解决了这个问题。

7.5.2Pulsar实现

Pulsar的实现如下图:

Pulsar的延时消息首先会写入一个DelayedMessageTracker的数据结构中,DelayedMessageTracker根据延时时间构建delayedindex优先级队列。消费者拉取消息时,首先去DelayedMessageTracker检查是否有到期的消息。如果有则直接拉取进行消费。

7.5.4RabbitMQ实现

RabbitMQ的实现方式有两种,一种是投递到普通队列都不消费,等消息过期后被投递到死信队列,消费者消费死信队列。如下图:

第二种方式是生产者发送消息时,先发送到本地Mnesia数据库,消息到期后定时器再将消息投递到broker。

7.5.4Kafka实现

Kafka本身并没有延时队列,不过可以通过生产者拦截器来实现消息延时发送,也可以定义延时Topic,利用类似RocketMQ的方案来实现延时消息。

7.6事务消息

事务消息是指生产消息和消费消息满足事务的特性。

RabbitMQ和Kafka的事务消息都是只支持生产消息的事务特性,即一批消息要不全部发送成功,要不全部发送失败。

RabbitMQ通过Channel来开启事务消息,代码如下:

Kafka可以给多个生产者设置同一个事务ID,从而把多个Topic、多个Partition放在一个事务中,实现原子性写入。

Pulsar的事务消息对于事务语义的定义是:允许事件流应用将消费、处理、生产消息整个过程定义为一个原子操作。可见,Pulsar的事务消息可以覆盖消息流整个过程。

RocketMQ的事务消息是通过half消息来实现的。以电商购物场景来看,账户服务扣减账户金额后,发送消息给Broker,库存服务来消费这条消息进行扣减库存。如下图:

可见,RocketMQ只能保证生产者发送消息和本地事务的原子性,并不能保证消费消息的原子性。

7.7轨迹消息

轨迹消息主要用于跟踪消息的生命周期,当消息丢失时可以很方便地找出原因。

轨迹消息也跟普通消息一样,也需要存储和查询,也会占用消息队列的资源,所以选择轨迹消息要考虑下面几点:

RocketMQ生产者、Broker和消费者都实现了轨迹消息,不过默认是关闭的,需要手工开启。

使用轨迹消息,需要考虑记录哪些节点、存储介质、性能、查询方式等问题。

7.8Kafka是否会消息丢失?

生产者丢失数据

消费者程序丢失数据

7.9Kafka如何持久化?

定期删除消息(日志段)

7.10Kafka文件存储机制

segmentfile文件命名规则:

一对segmentfile

message物理结构

7.11Kafka分区

为什么分区?

分区策略?

7.12MQ消息堆积问题处理

消息堆积可能的原因:

队列中消息不能被及时的消费,导致大量堆积在队列里面rocketMqKafkaRabbitMq都会有这样的问题产生消息堆积的可以从mq的生产消费模型去考虑,从生产者到消息中间件、再到消费者,都会发生堆积消费者

:消费者处理速度过慢,或者消费者故障、延迟,无法即使的处理消息,导致消息堆积生产者:

生产者产生速度过快,消费者无法即使处理MQ消息队列

:Mq服务器的性能不足,比如它所在的机器,cpu、内存、磁盘等超载,无法即使的处理消息,导致消息堆积其他

:其他方面也会有这样的问题,比如网络故障,连接问题,消息在传递过程中过慢,从而导致消息堆积业务方面,消息消费失败重试,不断的重试,没有设置重试次数,导致消息堆积!。

处理消息堆积问题:

一、消费者:

1、增加消费者的数量

,提高消费的处理速度;(注意这个不通用

,只适合RabbitMq)需要注意不能一味的水平扩展消费者

因为其他关键链路性能是否抗的住大量的水平扩展,比如mysq、redis,详细见下方rabbitmq消息堆积解决方案2、或者提高消费者的处理能力

,比如通过并发处理、异步处理提高消费者吞吐量。这个则要注意通过线程池、队列,把mq拉到程序的队列中,要承担对应的宕机导致消息丢失

风险。

二、MQ消息队列:

增加MQ的服务器资源,cpu、内存、磁盘,提高mq处理能力也可以通过分区队列将消息分散到多个队列中,提高整体的处理能力。(这个则是Kafka、Rocket采用的

控制队列容量,避免堆积过多,设置持久化策略。

rabbitMQ的懒加载队列

,兼顾了持久化和堆积上限

三、监控告警(重要)

设置监控系统,比如普罗米修斯,监控消息数量,消费者处理速度,队列状态等等,在堆积发生前,即使的告警,及时采取措施。

But

上面的策略是通用的一些解决方案,不同的MQ,生产消费模型是不一样的,导致需要针对不同mq的消息堆积解决方案不一样。

RabbitMq、Kafka、RocketMq发生消息堆积,分别该如何去解决?

这里先点一下,增加消费者数量,并不是通用的,只适合RabbitMq

8总结

Kafka与Pulsar都是腾讯云主打的消息队列中间件,都具有高性能,高可靠,支持多种场景。Kafka推出的时间较早,各种场景比如日志、大数据处理等都有较成熟的解决方案。而Pulsar作为一个新秀,支持的功能比CKafka更丰富,而且跨地域容灾,多租户等功能,解决了很多Kafka设计缺陷和运维成本问题,整体稳定性更强。很多国内外大公司也有很多Pulsar的实践案例。因此,消息队列选型看这一篇就够了一些传统的日志、大数据处理等场景,对高吞吐量有要求的,对消息可靠性的要求没那么高的,可以选用Kafka,有很多优秀的文档说明怎么参数调优提高性能。而一些对消息可靠性、容灾要求更好,或者有高分区、延迟队列等需求的场景,可以选用Pulsar。

我们后台的技术栈是基于Golang的,在上文的对比中,还挑了一个基于Golang开发的消息队列NSQ,如果有一些定制化需求或者需要二次开发的,可以选用NSQ。也可以通过阅读NSQ的源码,学习一些优秀高性能消息队列中间件的实现方式,比如里边diskqueue组件,一个基于磁盘的消息队列,在某些场景下可能也可以进行二次利用。

9参考文献

欢迎关注腾讯程序员的直播

标签: 消息队列选型看这一篇就够了

卓越科技网 网站地图 免责声明:本网站部分内容由用户自行上传,若侵犯了您的权益,请联系我们处理,谢谢!联系QQ:2760375052 版权所有:卓越科技网 沪ICP备2023023636号-5