logo头像

生活多彩,尽情享受!

RocketMQ基本概念

一、专业术语
● Producer

消息生产者,负责生产消息,一般由业务系统负责产生消息。

● Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。

● Push Consumer

Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦接受到消息,Consumer对象立刻回调Listener接口方法。

● Pull Consumer

Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broken拉消息,主动权由应用控制。

● Producer Group

一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。

● Consumer Group

一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。

● Broker

消息中转角色,负责转储消息,一般也称为Server。在JMS规范中称为Provider。

● 广播消费

一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消费也会被Group中的每一个Consumer消费一次,广播消费中的Consumer Group概念可以认为在消息划分方面无意义。

在CORBA Notification规范中,消费方式都属于广播消费。

● 集群消费

一个Consumer Group中的Consumer实例平均分摊消费信息。例如某个Topic有9条信息,其中一个Consumer Group有三个实例,那么每个实例只消费其中的三条信息。

● 顺序消息

消费消息的顺序要同发送消息的顺序一致,在RocketMQ中,主要指局部顺序,即一类消息为满足顺序性,必须Producer单线顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费信息。

● 普通顺序消息

顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一 旦发生通信异常,Broker重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息队列不一致。

如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序消息比较合适。

● 严格顺序消息

顺序消息的一种,无论正常异常情况下都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。

● Message Queue

在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,Offset为java long类型,64位,理论上在100内不会溢出,所以认为是长度无限,另队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。

也可以认为Message Queue是一个长度无限的数字,Offset就是下标。

二、消息中间件需要解决的问题

  1. Publish/Subscribe

发布订阅是消息中间件的最基础功能,相对于传统的RPC通信而言。

  1. Message Priority

规范中描述的优先级是指在一个消息队列中,每条消息都有不同的优先级,一般用整数来描述,优先级高的消息先投递,如果消息完全在一个内存队列中,那么在投递前可以按照优先级排序,令优先级高的先投递。

由于RocketMQ所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大。因此RocketMQ没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列,将不同优先级发送到不同队列即可。

  1. Message Order

消息有序指一类消息消费时,能按照发送的顺序来消费。

RocketMQ可以严格的保证消息有序。

  1. Message Filter

4.1.1. Broker端消息过滤

在Broker中,按照Consumer的要求做过滤,优点是减少了对于Consumer无用消息的网络传输。缺点是增加了Broker对的负担,实现相对复杂。

4.1.2. Consumer端消息过滤

可以由应用自定义实现,但是会有很多无用的消息传递到Consumer端。

  1. Message Persistence

消息中间件通常采用的几种持久化方式:

(1) 持久化到数据库,如Mysql。

(2) 持久化到KV存储,例如levelDB、伯克利DB等KV存储系统。

(3) 文件记录形式持久化,例如Kafka、RocketMQ

(4) 对内存数据做一个持久化镜像,例如beanstalkd、VisiNotify

(1)(2)(3)三种持久化方式都具有将内存队列Buffer进行扩展的能力,(4)只是一个内存的镜像,作用是当Broker挂掉重启后仍然能将之前内存的数据恢复出来。

  1. Message Reliable

(1) Broker正常关闭

(2) Broker异常Crash

(3) OS Crash

(4) 机器断电,但是能立即恢复供电的情况

(5) 机器无法开机(可能是cpu、主板、内存等关键设备损坏)

(6) 磁盘设备损坏

前四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或丢失少量数据(依赖刷盘方式是同步还是异步)

后两种属于单点故障,且无法恢复。一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免,但是势必会影响到性能,适合对消息可靠性要求极高的场合。

  1. Low Latency Messaging

在消息不堆积的情况下,消息到达Broker后,能立刻到达Consumer。

RocketMQ使用长轮询Pull方式,可保证非常实时,消息实时性不低于Push。

  1. At Least Once

指每个消息必须投递一次。

RocketMQ Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好支持此特性。

  1. Exactly Only Once

(1)发送消息阶段,不允许发送重复的消息。

(2)消费消息阶段,不允许消费重复的消息。

只有以上两个条件都满足的情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重。RocketMQ虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消费情况,只有网络异常,Consumer启停等异常情况下才会出现消息重复。

该问题的本质原因是网络调用存在不确定性,即既不成功也不失败的第三种状态,所以产生了消息重复性问题。

  1. Broker的Buffer满了怎么办?

Broker的Buffer通常指Broker中一个队列的内存Buffer大小,这类Buffer通常大小有限,如果满了之后应该如何处理:

(1)RejectNewEvens

拒绝新来的消息,向Producer返回RejectNewEvens错误码

(2)按照特定策略丢弃已有消息

A)AnyOrder

B)FifoOrder

C)LifoOrder

D)PriorityOrder

E)DeadLineOrder

RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化到磁盘,数据定期删除。

对于这个问题的解决思路,RocketMQ同其他MQ有非常显著的区别,RocketMQ的内存Buffer抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker会定期删除过期的数据。

  1. 回溯消费

回溯消费是指Consumer已经成功消费的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度。

RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

  1. 消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分一下两种情况:

(1)消息堆积在内存Buffer中,一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer的大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。

(2)消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。

当消息不能在内存cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。

  1. 分布式事务

分布式事务涉及到两阶段提交问题,在数据存储方面必然需要KV存储的支持,因为第二阶段的提交回滚需要修改消息状态,一定涉及到根据key去查找message的动作。RocketMQ在第二阶段绕过了根据key去查找message的问题,采用第一阶段发送Prepare消息时。拿到了消息的Offset,第二阶段通过Offset去访问消息,并修改状态,Offset就是数据的地址。

RocketMQ这种实现事务的方式,没有通过KV存储做,而是通过Offset方式,存在一个显著缺陷,即通过Offset更改数据,会令系统的脏页过多,需要特别关注。

  1. 定时消息

定时消息是指消息发送到Broker后,不能立刻被Consumer消费,需要到特定的时间点或者特定的时间后才能被消费。

如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果在涉及到持久化,那么消息排序要不可避免的产生巨大的性能开销。

RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。

  1. 消息重试

Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有一下几种情况:

(1)由于消息本身的原因,例如反序列化失败,消息数据本身无法处理等。这种错误通常需要跳过这条消息,再消费其他消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10s后再重试。

(2)由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失效的信息,消费其他信息同样也会报错,这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

三、什么是RocketMQ,以及有什么样的特点
RocketMQ是个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。

·Producer、Consumer、队列都可以分布式

·Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个Consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer平均消费这个Topic对应的队列集合。

·能够保证严格的消息顺序

·提供非常丰富的消息拉取模式

·高效的订阅者水平扩展能力

·实时的消息订阅机制

·亿级消息堆积能力

·较少的依赖

消息中间件:利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,可以在分布式环境下扩展进程件的通信。

四、RocketMQ网络架构

n Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步

n Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Name Server不同的BrokerId来定义,BrokerId为0表示Master。非0表示Slave。Master也可以部署多个,每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。

n Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

n Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

五、RocketMQ存储特点

  1. 零拷贝原理

Consumer消费消息过程,使用了零拷贝,零拷贝包含以下两种方式:

A)使用mmap+write方式

优点:即使频繁调用,使用小块文件传输,效率也很高

缺点:不能很好利用DMA方式,会比sendfile多消耗CPU,内存安全性控制复杂,需要避免JVM Crash问题。

B)使用sendfile方式

优点:可以利用DMA方式,消耗CPU较少,大块文件传输效率高,无内存安全性问题。

缺点:小块文件效率低于mmap+write方式,只能是BIO方式传输,不能使用NIO。

RocketMQ选择了第一种方式,因为有小块数据传输的需求。

  1. 文件系统

RocketMQ选择Linux Ext4文件系统,原因如下;

Ext4文件系统删除1G大小的文件通常耗时小于50ms,而Etx3文件系统耗时约1s左右,且删除文件时,磁盘IO压力极大,会导致IO写入超时。

文件系统层面需要做一下调优措施

文件系统IO调度算法需要调整为deadline,因为deadline算法在随机读情况下,可以合并读请求为顺序跳跃方式,从而提高读IO吞吐量。

  1. 数据存储结构
  1. 存储目录结构
  1. 数据可靠性

六、RocketMQ的关键特性

  1. 单机支持1万以上持久化队列

(1)所有数据单独存储到一个Commit Log,完全顺序写,随机读。

(2)对最终用户展现的队列实际只存储消息在Commit Log的位置信息,并且串行方式刷盘。

这样做的好处有:

A)队列轻量化,单个队列数据量非常小。

B)对磁盘的访问串行化,避免磁盘竞争,不会因为队列增加导致IOWAIT增高。

缺点是:

A) 写虽然是顺序写,但是读变成了完全的随机读

B) 读一条消息,会先读Consumer Queue,再读Commit Log,增加了开销

C) 要保证Consumer和Commit Log完全一致,增加了编程的复杂度。

对缺点的解决方案:

A) 随机读,尽可能让读命中PAGECACHE,减少IO读操作,所以内存越大越好。如果系统中堆积的消息过多,读数据访问磁盘将不会由于随机读导致系统性能急剧下降。

B) 由于Consumer Queue存储数据量极少,而且是顺序读,在PAGECACHE预读作用下,Consumer Queue的读性能几乎于内存一致,即使堆积情况下。所以可以认为Consumer Queue完全不会阻碍读性能。

C) Commit Log中存储了所有的元信息,包含消息体,类似于MySQL、Oracle的redolog,所以只要有Commit Log在,Consumer Queue即使数据丢失,仍然可以恢复出来。

  1. 刷盘策略

RocketMQ的所有消息都是持久化的,先写入系统PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,直接从内存读取。

(1) 异步刷盘

A) 由于磁盘速度大于网卡速度,那么刷盘的进度肯定可以跟上消息的写入速度

B) 万一由于此时系统压力过大,可能堆积消息,除了写入IO,还有读取IO,万一出现磁盘读取落后的情况,系统内存也不会溢出。

(2) 同步刷盘

与异步刷盘唯一的区别是异步刷盘写完PAGECACHE后直接返回,而同步刷盘需要等待刷盘完成后才返回,具体步骤如下:

写入PAGECACHE后,线程等待,通知刷盘线程刷盘。

刷盘线程刷完盘后,唤醒前端等待线程。

前端等待线程向用户返回成功。

  1. 消息查询

(1) 按照MessageId查询信息

MsgId总共16个字节,包含消息存储主机地址,消息Commit Log Offset。从MsgId中解析出Broker的地址和Commit Log的偏移地址,然后按照存储格式所在位置消息buffer解析成一个完整的消息。

(2) 按照Message Key查询信息

a) 根据查询的key的hashcode%slotNum得到具体的槽的位置(slotNum是一个索引文件里面包含的最大槽的数目)

b) 根据slotValue(slot位置对应的值)查找到索引项列表的最后一项

c) 遍历索引项列表返回查询时间内的结果集

d) 处理Hash冲突

e) 存储

  1. 服务器消息过滤

RocketMQ的消息过滤方式有别于其他消息中间件,是在做订阅时,再进行过滤。

这样处理的原因在于:

(1) Message Tag存储hashcode,是为了在Consumer Queue定长方式存储,节省空间

(2) 过滤过程中不会访问Commit Log数据,可以保证堆积情况下也能高效过滤

(3) 即使存在hash冲突,也可以在Consumer端进行修正,保证万无一失。

  1. 长轮询Pull

RocketMQ的Consumer都是从Broker拉消息来消费,为了能够做到实时接受消息,RocketMQ使用长轮询方式,可以保证消息实时性和Push方式一致。

  1. 顺序消息

顺序消息的缺陷:

·发送顺序消息无法利用集群FailOver特性

·消费顺序消息的并行度依赖于队列数量

·队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题

·遇到消费失败的消息,无法跳过,当前队列消费暂停

  1. 发送消息负载均衡

5个队列可以部署在同一台机器上,也可以分别部署在5台不同的机器上,发送消息通过轮询队列的方式发送,每个队列接受平均的消息量。通过增加机器,可以水平扩展队列容量。

  1. 订阅消息负载均衡

如果有5个队列,2个Consumer,那么第一个Consumer消费3个队列,第二个消费2个队列。这样做可以达到平均消费的目的,可以水平扩展Consumer来提高消费能力,但是Consumer的数量要小于队列数量。

  1. 单队列并行消费

单队列并行消费采用滑动窗口方式进行并行消费。

  1. HA,同步双写/异步复制

实现思路非常简单,Slave启动一个线程,不断从Master拉取Commit Log中的数据,然后再异步build出Consumer Queue数据结构。

七、RocketMQ通信组件

  1. 心跳处理

通信组件本身不处理心跳,由上层处理。

  1. 连接复用

同一个网络连接,客户端多个线程可以同时发送请求,应答响应通过header中的opaque字段来标识。

  1. 超时连接

如果某个连接超过特定时间没有活动(无读写事件),则自动关闭此连接,并通知上层事务,清除连接对应的注册信息。

  1. RocketMQ服务发现(Name Server)

Name Server是专为RocketMQ设计的轻量级名称服务,具有简单,可集群横向扩展、无状态等特点。将要支持的主备自动切换功能会强依赖Name Server。