一、消费发送
生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、异步发送、Oneway发送、延迟发送、发送事务消息等。 默认使用的是DefaultMQProducer类,发送消息要经过五个步骤:
1、设置Producer的GroupName。
2、设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。
3、设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次。
4、设置NameServer地址
5、组装消息并发送。
消息发生返回状态(SendResult#SendStatus)有如下四种:
- FLUSH_DISK_TIMEOUT
- FLUSH_SLAVE_TIMEOUT
- SLAVE_NOT_AVAILABLE
- SEND_OK
不同状态在不同的刷盘策略和同步策略的配置下含义是不同的:
- FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成SYNC_FLUSH才会报这个错误)。
- FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步。
- SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
- SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。
写一个高质量的生产者程序,重点在于对发送结果的处理,要充分考虑各种异常,写清对应的处理逻辑。
提升写入的性能
发送一条消息出去要经过三步
1、客户端发送请求到服务器。
2、服务器处理该请求。
3、服务器向客户端返回应答
一次消息的发送耗时是上述三个步骤的总和。
在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用, 可以采用Oneway方式。
发送Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。
用这种方式发送消息的耗时可以缩短到微秒级。
另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,我们不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。
顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能。
目前在阿里内部经过调优的服务器上,写入性能达到90万+的TPS,我们可以参考这个数据进行系统优化。
在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法。
二、消息消费
简单总结消费的几个要点:
- 消息消费方式(Pull和Push)
- 消息消费的模式(广播模式和集群模式)
- 流量控制(可以结合sentinel来实现,后面单独讲)
- 并发线程数设置
- 消息的过滤(Tag、Key) TagA||TagB||TagC * null
当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer的处理能力。
1、提高消费并行度
在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度。
通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。
注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息。
此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax)。
2、以批量方式进行消费
某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表。
3、检测延时情况,跳过非重要消息
Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。
三、消息存储
1、存储介质
关系型数据库DB
Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障
文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
2、性能对比
文件系统>关系型数据库DB
3、消息的存储和发送
1、消息存储
目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。
但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!
因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。
RocketMQ的消息用顺序写,保证了消息存储的速度。
2、存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。
1、CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
2、ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能
RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行
如果要遍历commitlog文件根据topic检索消息是非常低效。
Consumer即可根据ConsumeQueue来查找待消费的消息。
其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引:
- 保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset
- 消息大小size
- 消息Tag的HashCode值。
consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:
topic/queue/file三层组织结构
具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
。
consumequeue文件采取定长设计,每个条目共20个字节,分别为:
- 8字节的commitlog物理偏移量
- 4字节的消息长度
- 8字节tag hashcode
单个文件由30W个条目组成,可以像数组一样随机访问每一个条目
每个ConsumeQueue文件大小约5.72M;
3、IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
- Index文件的存储位置是:HOME/store/index/{fileName}
- 文件名fileName是以创建时的时间戳命名的
- 固定的单个IndexFile文件大小约为400M
- 一个IndexFile可以保存 2000W个索引
- IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
四、过滤消息
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。
主要支持如下3种的过滤方式
1、Tag过滤方式
Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。
Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。
Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。
Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤。
在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
2、SQL92的过滤方式
仅对push的消费者起作用。
Tag方式虽然效率高,但是支持的过滤逻辑比较简单。
SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样
真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。
每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。
SQL92的表达式上下文为消息的属性。
首先需要开启支持SQL92的特性,然后重启broker:
mqbroker -n localhost:9876 -c /opt/rocket/conf/broker.conf
RocketMQ仅定义了几种基本的语法,用户可以扩展:
- 数字比较: >, >=, <, <=, BETWEEN, =
- 字符串比较: =, <>, IN; IS NULL或者IS NOT NULL;
- 逻辑比较: AND, OR, NOT;
- Constant types are: 数字如:123, 3.1415; 字符串如:'abc',必须是单引号引起来 NULL,特殊常量 布尔型如:TRUE or FALSE;
3、Filter Server方式
这是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。
要使用Filter Server,首先要在启动Broker前在配置文件里加上filterServer-Nums=3这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。
这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。
五、零拷贝原理
1、PageCache
- 由内存中的物理page组成,其内容对应磁盘上的block。
- page cache的大小是动态变化的。
- backing store: cache缓存的存储设备
- 一个page通常包含多个block, 而block不一定是连续的。
1、读Cache
- 当内核发起一个读请求时, 先会检查请求的数据是否缓存到了page cache中。
- 如果有,那么直接从内存中读取,不需要访问磁盘, 此即 cache hit(缓存命中)
- 如果没有, 就必须从磁盘中读取数据, 然后内核将读取的数据再缓存到cache中, 如此后续的读请求就可以命中缓存了。
- page可以只缓存一个文件的部分内容, 而不需要把整个文件都缓存进来。
2、写Cache
当内核发起一个写请求时, 也是直接往cache中写入, 后备存储中的内容不会直接更新。
内核会将被写入的page标记为dirty, 并将其加入到dirty list中。
内核会周期性地将dirty list中的page写回到磁盘上, 从而使磁盘上的数据和内存中缓存的数据一致。
3、cache回收
- Page cache的另一个重要工作是释放page, 从而释放内存空间。
- cache回收的任务是选择合适的page释放
- 如果page是dirty的, 需要将page写回到磁盘中再释放。
2、cache和buffer的区别
1、Cache:缓存区,是高速缓存,是位于CPU和主内存之间的容量较小但速度很快的存储器,因为CPU的速度远远高于主内存的速度,CPU从内存中读取数据需等待很长的时间,而 Cache保存着CPU刚用过的数据或循环使用的部分数据,这时从Cache中读取数据会更快,减少了CPU等待的时间,提高了系统的性能。
Cache并不是缓存文件的,而是缓存块的(块是I/O读写最小的单元);Cache一般会用在I/O请求上,如果多个进程要访问某个文件,可以把此文件读入Cache中,这样下一个进程获取CPU控制权并访问此文件直接从Cache读取,提高系统性能。
2、Buffer:缓冲区,用于存储速度不同步的设备或优先级不同的设备之间传输数据;通过buffer可以减少进程间通信需要等待的时间,当存储速度快的设备与存储速度慢的设备进行通信时,存储慢的数据先把数据存放到buffer,达到一定程度存储快的设备再读取buffer的数据,在此期间存储快的设备CPU可以干其他的事情。
Buffer:一般是用在写入磁盘的,例如:某个进程要求多个字段被读入,当所有要求的字段被读入之前已经读入的字段会先放到buffer中。
3、HeapByteBuffer和DirectByteBuffer
HeapByteBuffer,是在jvm堆上面一个buffer,底层的本质是一个数组,用类封装维护了很多的索引(limit/position/capacity等)。
DirectByteBuffer,底层的数据是维护在操作系统的内存中,而不是jvm里,DirectByteBuffer里维护了一个引用address指向数据,进而操作数据。
HeapByteBuffer优点:内容维护在jvm里,把内容写进buffer里速度快;更容易回收
DirectByteBuffer优点:跟外设(IO设备)打交道时会快很多,因为外设读取jvm堆里的数据时,不是直接读取的,而是把jvm里的数据读到一个内存块里,再在这个块里读取的,如果使用DirectByteBuffer,则可以省去这一步,实现zero copy(零拷贝)
外设之所以要把jvm堆里的数据copy出来再操作,不是因为操作系统不能直接操作jvm内存,而是因为jvm在进行gc(垃圾回收)时,会对数据进行移动,一旦出现这种问题,外设就会出现数据错乱的情况。
所有的通过allocate方法创建的buffer都是HeapByteBuffer.
堆外内存实现零拷贝
-
前者分配在JVM堆上(ByteBuffer.allocate()),后者分配在操作系统物理内存上(ByteBuffer.allocateDirect(),JVM使用C库中的malloc()方法分配堆外内存);
-
DirectByteBuffer可以减少JVM GC压力,当然,堆中依然保存对象引用,fullgc发生时也会回收直接内存,也可以通过system.gc主动通知JVM回收,或者通过 cleaner.clean主动清理。Cleaner.create()方法需要传入一个DirectByteBuffer对象和一个Deallocator(一个堆外内存回收线程)。GC发生时发现堆中的DirectByteBuffer对象没有强引用了,则调用Deallocator的run()方法回收直接内存,并释放堆中DirectByteBuffer的对象引用;
-
底层I/O操作需要连续的内存(JVM堆内存容易发生GC和对象移动),所以在执行write操作时需要将HeapByteBuffer数据拷贝到一个临时的(操作系统用户态)内存空间中,会多一次额外拷贝。而DirectByteBuffer则可以省去这个拷贝动作,这是Java层面的 “零拷贝” 技术,在netty中广泛使用;
-
MappedByteBuffer底层使用了操作系统的mmap机制,FileChannel#map()方法就会返回MappedByteBuffer。DirectByteBuffer虽然实现了MappedByteBuffer,不过DirectByteBuffer默认并没有直接使用mmap机制。
4、缓冲IO和直接IO
1、缓存IO
缓存I/O又被称作标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制中,数据先从磁盘复制到内核空间的缓冲区,然后从内核空间缓冲区复制到应用程序的地址空间。读操作:操作系统检查内核的缓冲区有没有需要的数据,如果已经缓存了,那么就直接从缓存中返回;否则从磁盘中读取,然后缓存在操作系统的缓存中。
写操作:将数据从用户空间复制到内核空间的缓存中。这时对用户程序来说写操作就已经完成,至于什么时候再写到磁盘中由操作系统决定,除非显示地调用了sync同步命令。
缓存I/O的优点:
1、在一定程度上分离了内核空间和用户空间,保护系统本身的运行安全;
2、可以减少读盘的次数,从而提高性能。
缓存I/O的缺点:
1、在缓存 I/O 机制中,DMA 方式可以将数据直接从磁盘读到页缓存中,或者将数据从页缓存直接写回到磁盘上,而不能直接在应用程序地址空间和磁盘之间进行数据传输。数据在传输过程中就需要在应用程序地址空间(用户空间)和缓存(内核空间)之间进行多次数据拷贝操作,这些数据拷贝操作所带来的CPU以及内存开销是非常大的。
2、直接IO
直接IO就是应用程序直接访问磁盘数据,而不经过内核缓冲区,这样做的目的是减少一次从内核缓冲区到用户程序缓存的数据复制。比如说数据库管理系统这类应用,它们更倾向于选择它们自己的缓存机制,因为数据库管理系统往往比操作系统更了解数据库中存放的数据,数据库管理系统可以提供一种更加有效的缓存机制来提高数据库中数据的存取性能。
直接IO的缺点:如果访问的数据不在应用程序缓存中,那么每次数据都会直接从磁盘加载,这种直接加载会非常缓慢。通常直接IO与异步IO结合使用,会得到比较好的性能。
5、内存映射文件(Mmap)
在LINUX中我们可以使用mmap用来在进程虚拟内存地址空间中分配地址空间,创建和物理内存的映射关系。
映射关系可以分为两种
1、文件映射 磁盘文件映射进程的虚拟地址空间,使用文件内容初始化物理内存。
2、匿名映射 初始化全为0的内存空间。
而对于映射关系是否共享又分为
1、私有映射(MAP_PRIVATE) 多进程间数据共享,修改不反应到磁盘实际文件,是一个copy-on-write(写时复制)的映射方式。
2、共享映射(MAP_SHARED) 多进程间数据共享,修改反应到磁盘实际文件中。
因此总结起来有4种组合
1、私有文件映射 多个进程使用同样的物理内存页进行初始化,但是各个进程对内存文件的修改不会共享,也不会反应到物理文件中
2、私有匿名映射 mmap会创建一个新的映射,各个进程不共享,这种使用主要用于分配内存(malloc分配大内存会调用mmap)。 例如开辟新进程时,会为每个进程分配虚拟的地址空间,这些虚拟地址映射的物理内存空间各个进程间读的时候共享,写的时候会copy-on-write。
3、共享文件映射 多个进程通过虚拟内存技术共享同样的物理内存空间,对内存文件 的修改会反应到实际物理文件中,他也是进程间通信(IPC)的一种机制。
4、共享匿名映射 这种机制在进行fork的时候不会采用写时复制,父子进程完全共享同样的物理内存页,这也就实现了父子进程通信(IPC)
mmap只是在虚拟内存分配了地址空间,只有在第一次访问虚拟内存的时候才分配物理内存。
在mmap之后,并没有在将文件内容加载到物理页上,只上在虚拟内存中分配了地址空间。当进程在访问这段地址时,通过查找页表,发现虚拟内存对应的页没有在物理内存中缓存,则产生"缺页",由内核的缺页异常处理程序处理,将文件对应内容,以页为单位(4096)加载到物理内存,注意是只加载缺页,但也会受操作系统一些调度策略影响,加载的比所需的多。
零拷贝(zero copy)小结
1、虽然叫零拷贝,实际上sendfile有2次数据拷贝的。第1次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(协议引擎)。如果网卡支持 SG-DMA(The Scatter-GatherDirect Memory Access)技术,就无需从PageCache拷贝至 Socket 缓冲区;
2、之所以叫零拷贝,是从内存角度来看的,数据在内存中没有发生过拷贝,只是在内存和I/O设备之间传输。很多时候我们认为sendfile才是零拷贝,mmap严格来说不算;
3、Linux中的API为sendfile、mmap,Java中的API为FileChanel.transferTo()、FileChannel.map()等;
4、Netty、Kafka(sendfile)、Rocketmq(mmap)、Nginx等高性能中间件中,都有大量利用操作系统零拷贝特性。
六、同步复制和异步复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
1、同步复制
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
2、异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
3、配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
/opt/rocket/conf/broker.conf
文件:Broker的配置文件
参数名 | 默认值 | 说明 |
---|---|---|
listenPort | 10911 | 接受客户端连接的监听端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 网卡的 InetAddress | 当前 broker 监听的 IP |
brokerIP2 | 跟 brokerIP1 一样 | 存在主从 broker 时,如果在 broker主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的brokerIP2 进行同步 |
brokerName | null | broker 的名称 |
brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 |
brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |
fileReserverdTime | 72 | 以小时计算的文件保留时间 |
brokerRole | ASYNC_MASTER | SYNC_MASTER或者ASYNC_MASTER或者SLAVE SYNC_MASTER表示当前broker是一个同步复制的Master ASYNC_MASTER表示当前broker是一个异步复制的Master SLAVE表示当前borker是一个Slave。 |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。 ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 |
4、总结
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
七、高可用机制
RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。
Master和Slave的区别:
1、在Broker的配置文件中,参数brokerId的值为0表明这个Broker是Master,
2、大于0表明这个Broker是Slave,brokerRole参数也说明这个Broker是Master还是Slave。(SYNC_MASTER/ASYNC_MASTER/SALVE)
3、Master角色的Broker支持读和写,Slave角色的Broker仅支持读。
4、Consumer可以连接Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
1、消息消费高可用
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。
有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。
这就达到了消费端的高可用性。
2、消息发送高可用
如何达到发送端的高可用性呢?
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样既可以在性能方面具有扩展性,也可以降低主节点故障对整体上带来的影响,而且当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息的。
RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master。
1、手动停止Slave角色的Broker。
2、更改配置文件。
3、用新的配置文件启动Broker。
这种早期方式在大多数场景下都可以很好的工作,但也面临一些问题。
比如,在需要保证消息严格顺序的场景下,由于在主题层面无法保证严格顺序,所以必须指定队列来发送消息,对于任何一个队列,它一定是落在一组特定的主从节点上,如果这个主节点宕机,其他的主节点是无法替代这个主节点的,否则就无法保证严格顺序。
在这种复制模式下,严格顺序和高可用只能选择一个。
RocketMQ 在 2018 年底迎来了一次重大的更新,引入 Dledger,增加了一种全新的复制方式。
RocketMQ 引入 Dledger,使用新的复制方式,可以很好地解决这个问题。
Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。
举例:
假如有3个节点,当主节点宕机的时候,2 个从节点会通过投票选出一个新的主节点来继续提供服务,相比主从的复制模式,解决了可用性的问题。
由于消息要至少复制到 2 个节点上才会返回写入成功,即使主节点宕机了,也至少有一个节点上的消息是和主节点一样的。
Dledger在选举时,总会把数据和主节点一样的从节点选为新的主节点,这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。
存在问题:
当然,Dledger的复制方式也不是完美的,依然存在一些不足:
1、比如,选举过程中不能提供服务。
2、最少需要 3 个节点才能保证数据一致性,3 节点时,只能保证 1 个节点宕机时可用,如果 2个节点同时宕机,即使还有 1 个节点存活也无法提供服务,资源的利用率比较低。
3、另外,由于至少要复制到半数以上的节点才返回写入成功,性能上也不如主从异步复制的方式快。
八、刷盘机制
RocketMQ 的所有消息都是持久化的,先写入系统 PageCache,然后刷盘,可以保证内存与磁盘都有一份数据, 访问时,直接从内存读取。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
1、同步刷盘
同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PageCache直接返回,而同步刷盘需要等待刷盘完成才返回, 同步刷盘流程如下:
1、写入 PageCache后,线程等待,通知刷盘线程刷盘。
2、刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。
3、前端等待线程向用户返回成功
2、异步刷盘
在有 RAID 卡,SAS 15000 转磁盘测试顺序写文件,速度可以达到 300M 每秒左右,而线上的网卡一般都为千兆 网卡,写磁盘速度明显快于数据网络入口速度,那么是否可以做到写完内存就向用户返回,由后台线程刷盘呢?
1、由于磁盘速度大于网卡速度,那么刷盘的进度肯定可以跟上消息的写入速度。
2、万一由于此时系统压力过大,可能堆积消息,除了写入 IO,还有读取 IO,万一出现磁盘读取落后情况, 会不会导致系统内存溢出,答案是否定的,原因如下:
- 写入消息到 PageCache时,如果内存不足,则尝试丢弃干净的 PAGE,腾出内存供新消息使用,策略是LRU 方式。
- 如果干净页不足,此时写入 PageCache会被阻塞,系统尝试刷盘部分数据,大约每次尝试 32个 PAGE , 来找出更多干净 PAGE。
综上,内存溢出的情况不会出现。
九、负载均衡
RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。
1、Producer的负载均衡
5 个队列可以部署在一台机器上,也可以分别部署在 5 台不同的机器上,发送消息通过轮询队列的方式 发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。
# 创建主题
# mqadmin updateTopic -n localhost:9876 -t tp_demo_02 -w 6 -b localhost:10911
2、Consumer的负载均衡
如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二consumer 消费 2 个队列。 这样即可达到平均消费的目的,可以水平扩展 Consumer 来提高消费能力。但是 Consumer 数量要小于等于队列数 量,如果 Consumer 超过队列数量,那么多余的Consumer 将不能消费消息 。
在RocketMQ中,Consumer端的两种消费模式(Push/Pull)底层都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。
如果未拉取到消息,则延迟一下又继续拉取。
在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列中去获取消息。
因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。
要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个Consumer。
知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。
在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。
Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener函数,一个是MQPullConsumerScheduleService(使用这个Class类似使用DefaultMQPushConsumer,但是它把Pull消息的主动性留给了使用者)
DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发doRebalance动作。
负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的Consumer中
消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。
十、消息重试
1、顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
2、无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
1、重试次数
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
十一、死信队列
RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:
%RETRY%消费组名称(重试Topic)
%DLQ%消费组名称(死信Topic)
死信队列也可以被订阅和消费,并且也会过期
可视化工具:RocketMQ Dashboard
下载地址:https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.zip
使用jdk8:
# 编译打包
mvn clean package -Dmaven.test.skip=true
# 运行工具
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
页面设置NameSrv地址即可。如果不生效,就直接修改项目的application.properties中的namesrv地址选项的值。
1、死信特性
死信消息具有以下特性
不会再被消费者正常消费。
有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3天内及时处理。
死信队列具有以下特性:
一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
2、查看死信信息
1、在控制台查询出现死信队列的主题信息
2、在消息界面根据主题查询死信消息
3、选择重新发送消息
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
十二、延迟消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
查看SCHEDULE_TOPIC_XXXX主题信息:
# pwd
/root/store/consumequeue/SCHEDULE_TOPIC_XXXX
# ls
0 1 10 11 12 13 14 15 16 17 2 3 4 5 6 7 8 9
#
十三、顺序消息
顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这3个消息必须按顺序处理才行。
顺序消息分为全局顺序消息和部分顺序消息:
1、全局顺序消息指某个Topic下的所有消息都要保证顺序;
2、部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单ID的三个消息能按顺序消费即可。
在多数的业务场景中实际上只需要局部有序就可以了。
RocketMQ在默认情况下不保证顺序,比如创建一个Topic,默认八个写队列,八个读队列。这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个Consumer,每个Consumer也可能启动多个线程并行处理,所以消息被哪个Consumer消费,被消费的顺序和写入的顺序是否一致是不确定的。
要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。
部分有序:
顺序消息的生产和消费:
# 创建主题,8写8读
# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 8 -t tp_demo_07 -w 8
# 删除主题的操作:
# mqadmin deleteTopic -c DefaultCluster deleteTopic -n localhost:9876 -t tp_demo_07
# 主题描述
# mqadmin topicStatus -n localhost:9876 -t tp_demo_07
全局有序:
顺序消息的生产和消费:
# 创建主题,8写8读
# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 1 -t tp_demo_07_01 -w 1
# 删除主题的操作:
# mqadmin deleteTopic -c DefaultCluster deleteTopic -n localhost:9876 -t tp_demo_07_01
# 主题描述
# mqadmin topicStatus -n localhost:9876 -t tp_demo_07_01
十四、事务消息
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个操作同时成功或者同时失败。
RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProducer处理上面情况的流程是,先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的操作,根据操作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,具体流程如下:
1、发送方向RocketMQ发送“待确认”消息。
2、RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。
3、发送方开始执行本地事件逻辑。
4、发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。
5、如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求。
6、发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。
7、RocketMQ收到回查请求后,按照步骤4)的逻辑处理。
十六、消息优先级
有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ是个先入先出的队列,不支持消息级别或者Topic级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决,下面列举三种优先级相关需求的具体处理方法。
第一种
多个不同的消息类型使用同一个topic时,由于某一个种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的 Topic,其他类型消息在另外一个Topic,应用程序创建两个 Consumer,分别订阅不同的 Topic,这样就可以了。
第二种
情况和第一种情况类似,但是不用创建大量的 Topic。举个实际应用场景: 一个订单处理系统,接收从 100家快递门店过来的请求,把这些请求通过 Producer 写入RocketMQ;订单处理程序通过Consumer 从队列里读取消 息并处理,每天最多处理 1 万单 。 如果这 100 个快递门店中某几个门店订单量 大增,比如门店一接了个大客户,一个上午就发出 2万单消息请求,这样其他 的 99 家门店可能被迫等待门店一的 2 万单处理完,也就是两天后订单才能被处 理,显然很不公平 。
这时可以创建 一 个 Topic, 设置 Topic 的 MessageQueue 数 量 超过 100 个,Producer根据订单的门店号,把每个门店的订单写人 一 个 MessageQueue。 DefaultMQPushConsumer默认是采用循环的方式逐个读取一个 Topic 的所有 MessageQueue,这样如果某家门店订单量大增,这家门店对应的 MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。
DefaultMQPushConsumer 默认的 pullBatchSize 是 32,也就是每次从某个 MessageQueue 读取消息的时候,最多可以读 32 个 。 在上面的场景中,为了更 加公平,可以把 pullBatchSize 设置成1。
第三种
强制优先级
TypeA、 TypeB、 TypeC 三类消息 。 TypeA 处于第一优先级,要确保只要有TypeA消息,必须优先处理; TypeB处于第二优先 级; TypeC 处于第三优先级 。 对这种要求,或者逻辑更复杂的要求,就要用 户自己编码实现优先级控制,如果上述的 三 类消息在一个 Topic 里,可以使 用 PullConsumer,自主控制 MessageQueue 的遍历,以及消息的读取;如果上述三类消息在三个 Topic下,需要启动三个Consumer, 实现逻辑控制三个 Consumer 的消费 。
十七、底层网络通信 - Netty高性能之道
RocketMQ底层通信的实现是在Remoting模块里,因为借助了Netty而没有重复造轮子,RocketMQ的通信部分没有很多的代码,就是用Netty实现了一个自定义协议的客户端/服务器程序。
- 1、自定义ByteBuf可以从底层解决ByteBuffer的一些问题,并且通过“内存池”的设计来提升性能
- 2、Reactor主从多线程模型
- 3、充分利用了零拷贝,CAS/volatite高效并发编程特性
- 4、无锁串行化设计
- 5、管道责任链的编程模型
- 6、高性能序列化框架的支持
- 7、灵活配置TCP协议参数
RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:
(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。
(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。
(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
从上面1)~3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。
rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。
RocketMQ中惯用的套路:
请求报文和响应都使用RemotingCommand,然后在Processor处理器中根据RequestCode请求码来匹配对应的处理方法。
处理器通常继承至NettyRequestProcessor,使用前需要先注册才行,注册方式remotingServer.registerDefaultProcessor
网络通信核心的东西无非是:
线程模型
私有协议定义
编解码器
序列化/反序列化
…
既然是基于Netty的网络通信,当然少不了一堆自定义实现的Handler,例如继承至:SimpleChannelInboundHandler ChannelDuplexHandler
评论区