kafka日志存储
kafka中的topic是逻辑上的概念,而真正存储消息的是分区,更具体点来说是分区中的segment,每个分区可分为多个segment,存储的消息称为日志。
1、LogSegment
一个partition分区拥有多个logsegment,kafka日志追加是顺序写入的,把分区分为多个日志段可减小日志文件的大小,在进行日志删除和数据读取的时候也可以快速定位。需要注意的是只有活跃的日志段拥有文件的写入权限,其余的都只有读取权限。
日志文件存在多个后缀的文件,重点需要关注的有.index,.timeindex,.log三个。每个LogSegment都有一个基准偏移量,用于表示当前LogSegment文件的第一条消息的offset,固定长度是20位数字,长度未达到用0填充,索引文件和日志文件都由此命名规则命名如000000000000000000100.index、00000000000000000100.timeindex、00000000000000000100.log。
2、日志格式演变
2.1、v0版本
上图RECORD部分即为v0版本的消息格式,在0.10.0版本之前都是采用的此种消息格式。LOG_OVERHEAD和RECORD一期来描述一条消息,与消息对应的是消息集的概念,消息集中包含多条消息,消息集不仅是存储于磁盘以及在网络上传输(Produce和Fetch)的基本形式,而且是kafka中压缩的基本单元。
- crc32, 4字节,crc32校验值,校验范围从magic到value
- magic,1字节,消息格式版本号,此版本magic值为0
- attributes,1字节,表示消息的属性,第三位表示消息的压缩类型,0为NONE,1为GZIP,2为SNAPPY,3为LZ4,其余位保留
- key length,消息的key的长度,如果值为-1,表示此消息无key,即key=null
- key,可选,如果没有key则无此字段
- value length,实际消息体的长度,如果为-1,表示消息为空
- value,消息体,可以为空。
v0版本的消息最小长度为crc32+magic+attributes+key length+value length = 4+1+1+4+4=14B,当消息的长度小于这个值,则可认为这条消息是破损的而不被接受。
2.2、v1版本
v1版本的magic值为1,v1版本的消息最小长度为14+8=22B,kafka从0.10.0版本开始到0.11.0之前使用的消息格式都是v1
消息压缩
kafka发送消息并不是一条一条发送的,而是通过消息集(message set)批量发送的,为了提高性能,kafka支持压缩消息的功能,具体来说就是压缩消息集。压缩解压过程是:生产者发送压缩消息集,broker端保存压缩的消息集,消费者解压消息集进行消费。这样就能减少网络IO消耗提升性能。
生产者可以通过配置compression.type来开启压缩功能,支持的压缩算法有GZIP、LZ4、SNAPPY三种。压缩消息时,将整个消息集进行压缩作为一个内层消息,这个内层消息作为外层消息的value,并将原来消息集最大的offset作为外层消息的offset,而内层消息的offset永远从0开始,如下图:
对于压缩的情形,外层消息的timestamp设置为:
- 如果timestamp类型是CreateTime,那么设置的是内层消息中最大的时间戳(the max timestamp of inner messages if CreateTime is used)
- 如果timestamp类型是LogAppendTime,那么设置的是kafka服务器当前的时间戳。
内层消息的timestamp设置为:
- 如果外层消息的timestamp类型是CreateTime,那么设置的是生产者创建消息时的时间戳
- 如果外层消息的timestamp类型是LogAppendTime,那么所有的内层消息时间戳都将被忽略
2.3、v2版本
kafka从0.11.0版本开始使用v2版本的消息格式,v2版本引入了变长整型(Varints)和ZigZag编码,Varints是使用一个或多个字节来序列化整数的一种方法,数值越小,其所占用的字节数就越少。ZigZag编码以一种锯齿状的方式来回穿梭于正负整数之间,以使得带符号整数映射为无符号整数,这样可使得绝对值较小的负数仍享有较小的Varints编码值。
如果没有使用Varints编码,如v0和v1,当key length = -1时,int类型的需要4个字节来保存,而使用Varints只需要一个字节,根据Varints的规则可以推导出0-63之间的数字占1个字节,64-8191之间的数字占2个字节,8192-1048575之间的数字占3个字节。而kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节),如果消息格式中与长度有关的字段采用Varints的编码的话,绝大多数情况下都会节省空间,而v2版本的消息格式也正是这样做的。
v2版本引入了新的概念Record Batch,而摒弃了先前的Message Set,在消息压缩的情形下,Record Batch Header部分(从first offset到records count字段)是不被压缩的,而被压缩的是records字段中的所有内容。
关于Record字段描述:
- length:消息总长度
- attributes:弃用,但是还是在消息格式中占据1B的大小,以备未来的格式扩展
- timestamp delta:时间戳增量。通常一个timestamp需要占用8个字节,如果像这里保存与RecordBatch的其实时间戳的差值的话可以进一步的节省占用的字节数
- offset delta:位移增量。保存与RecordBatch起始位移的差值,可以节省占用的字节数
- headers:这个字段用来支持应用级别的扩展,而不需要像v0和v1版本一样不得不将一些应用级别的属性值嵌入在消息体里面。Header的格式如上图最有,包含key和value,一个Record里面可以包含0至多个Header
RecordBatch字段描述:
- first offset:表示当前RecordBatch的起始位移
- length:计算partition leader epoch到headers之间的长度
- partition leader epoch:用来确保数据可靠性
- magic:消息格式的版本号,对于v2版本而言,magic等于2
- attributes,消息属性,注意这里占用了两个字节。低3位表示压缩格式,可以参考v0和v1;第4位表示时间戳类型;第5位表示此RecordBatch是否处于事务中,0表示非事务,1表示事务。第6位表示是否是Control消息,0表示非Control消息,而1表示是Control消息,Control消息用来支持事务功能
- last offset delta:RecordBatch中最后一个Record的offset与first offset的差值。主要被broker用来确认RecordBatch中Records的组装正确性
- first timestamp:RecordBatch中第一条Record的时间戳
- max timestamp:RecordBatch中最大的时间戳,一般情况下是指最后一个Record的时间戳,和last offset delta的作用一样,用来确保消息组装的正确性
- producer id:用来支持幂等性
- producer epoch:和producer id一样,用来支持幂等性
- first sequence:和producer id、producer epoch一样,用来支持幂等性
- records count:RecordBatch中Record的个数
3、日志索引
kafka的日志分段文件包括了两个索引文件:偏移量索引文件和时间戳索引文件。其中偏移量索引文件是为了建立消息偏移量与物理地址之间的映射关系,时间戳索引文件则是用来建立时间戳与偏移量的映射关系。索引文件都是以稀疏索引的方式构建的。
偏移量索引文件是单调递增的,查询时通过二分查找的方式快速定位到小于制定偏移量的最大偏移量,然后根据对应的物理地址找到对应的消息。时间索引文件中的时间戳也是单调递增的,查询时先定位到小于指定时间的最大时间戳,根据此时间戳对应的偏移量去偏移量索引文件中找到对应的消息。
3.1、偏移量索引
偏移量索引文件中的每个索引占8个字节,分为两部分
- relative offset,相对偏移量,占4字节,表示消息相对于baseOffsset的偏移量,这个baseOffset即为此日志文件的名
- position,物理地址,占4字节,表示消息在分段日志文件中的物理地址
使用偏移量索引查找消息过程:
- 因为偏移量索引文件文件名是baseOffset,所以可以使用二分法快速定位到指定的偏移量索引文件
- 找到索引文件后,在索引文件中利用二分法找到特定的索引项(小于当前偏移量的最大偏移量),进而找到此索引相对应的物理地址,最后顺序查找出对应偏移量的消息。
3.2、时间戳索引
时间戳索引占12个字节,分为两部分
- timestamp,当前日志分段的最大时间戳
- relative offset,时间戳对应的消息相对偏移量
根据时间戳查找对应消息过程
- 根据时间戳找到每个日志分段的文件中最大的时间戳逐一比较,定位到时间戳索引文件。由于时间戳索引文件也是根据baseOffset命名的,所以这一步无法使用二分法
- 在时间戳索引文件中使用二分法定位到时间戳索引项
- 根据时间戳索引项对应的偏移量使用偏移量索引找出对应的消息。
3.3、二分查找算法
kafka写入索引文件的方式是在文件末尾追加写入的,而几乎所有的索引查询都发生在索引尾部。kafka原本的二分查找并没有考虑到缓存的问题,可能会导致一些不必要的缺页中断,kafka线程会被阻塞等待索引项从磁盘读取并放入页缓存中。
上图kafka的某个索引占用了操作系统页缓存的13个页,如果待查找的位移值在最后一个页上则二分查找会一次读取编号为0、6、9、11、12的页。
通常情况下,一个页会存放成百上千的索引项,锁着索引文件被不断写入,page 12被不断地填充新的索引项。如果此时索引查询方都来自ISR副本集合或Lag很小的消费者,那么这些查询大都集中在对page 12 的查询上,此时page 0、6、9、11、12页一定或被经常性的访问,所以要把他们保存在页缓存上。
当page 12 空间被使用完了之后,会出现新的页,记为page 13,此时要查询最新的索引项所使用的页缓存为page 0、7、10、12、13,由于page 7和10很长时间没有使用过了,所以很可能会不在页缓存中,因此会出现缺页异常需要从磁盘中加载进页缓存中,这个过程可能会造成几毫秒至一秒的延迟。
基于上述问题,提出了缓存友好的二分算法,整体思路就是将所有索引项分为两个部分:热区和冷区。
这个改进版算法的最大的优点在于查询最热那部分数据所遍历的page永远是固定的,因此大概率在页缓存中,从而避免无意义的缺页异常。
改进版二分查找算法流程:
- 如果索引为空,直接返回<-1, -1>对;
- 确认热区第一个索引项位于哪个槽,_warmEntries参数就是所谓的分割线,目前固定为8192字节处,如果是OffsetIndex,_warmEntries = 8192 / 8 = 1024,即第1024个槽,如果是TimeIndex,_warmEntries = 8192 / 12 = 682,即第682个槽;
- 判断target位移在热区还是在冷区;
- 确保target位移值不能小于当前最小位移值;
- 如果在冷区,搜索冷区。
通过索引查找消息复杂度为 O(log2n)+O(m), n是索引文件里索引的个数,m为稀疏程度。
4、日志清理之Log Deletion
可以通过设置broker端的参数log.cleanup.policy值来设置日志清理策略,此参数默认值为delete,当然也可设置为compact,如果将log.cleanup.policy设置为compact,还需要将log.cleanup.enable(默认为true)设置为true。除此之外,log.cleanup.policy的值还可设置为“delete,compact”同时支持日志删除和日志压缩。
kafka日志管理器中有一个专门的日志删除任务来周期性检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端的参数log.retention.check.interval.ms来配置,默认为300000,也即是5分钟检测一次。当前的日志分段保留策略有三种:基于时间、基于日志大小和基于日志起始偏移量的保留策略。
4.1、基于时间
基于此策略的日志删除会检查当前日志文件中是否保留有超过了设定的阈值(retentionMs)来寻找可删除的日志分段文件集合deletableSegments。retentionMs可以通过broker端参数log.retention.hours、log.retention.minutes、log.retention.ms来配置,其中ms优先级最高,minute次之,hour最低,默认情况下配置了hour为168,也就是默认情况下日志分段文件会被保留7天。
查找过期的日志分段文件,并不是简单的根据日志分段的最近修改时间lastModifyTime来计算,而是根据日志分段中最大的时间戳largestTimeStamp来计算。因为日志分段的lastModifyTime可以被修改,如分区副本被重新分配。获取largestTimeStamp要先查询该日志分段对应的时间戳索引文件,查找时间戳索引文件的最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则设置为lastModifyTime
如果此分区所有的日志分段文件都被标记为删除,此时会先切分出一个新的日志分段作为activeSegment勇于接受新消息的写入,然后在执行之前的删除操作。
日志分段删除流程如下:
- 从日志对象文件中所维护的跳表中移除待删除的日志分段,以确保没有线程对这些日志分段进行读取操作
- 将待删除的日志分段文件加上“.delete”后缀,包括其对应的索引文件
- 最后交由一个以”delete-file”命名的延迟任务来删除这些“.delete”为后缀的文件,这个延迟任务可以通过file.delete.delay.ms来设置,默认为60000,即1min
4.2、基于日志大小
broker端可以设置log.retention.bytes的值来设置一个分区日志文件的大小,默认为-1表示无穷大,这个值设置的是总的日志分段大小,而不是一个日志分段。
此策略的删除过程首先会计算需要删除的日志大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合deletableSegments,然后删除,删除操作如4.1
4.3、基于日志起始偏移量
基于日志起始偏移量的删除策略的判断依据是某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是则可以删除此日志分段。
如上图,此时会删除前两个日志分段
5、日志清理之Log Compaction
Log Compaction是日志压紧、压实,相同key的不同value值只保留最后一个版本,也就是kafka定期将相同key的消息进行合并,只保留最新的value值。
6、切分文件的时机
日志文件和索引文件都会存在多个文件,组成多个SegmentLog,其切分规则如下:
- 当前日志分段文件的大小超过了broker端参数log.segment.bytes配置的值,默认值为1G
- 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于log.roll.ms或log.roll.hours参数配置的值,默认只配置了hour为186,即7天
- 偏移量索引文件或时间戳索引文件的大小达到了broker端参数log.index.size.max.bytes配置的值,默认为10MB
- 追加消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE(4字节),即要追加的消息的偏移量不能转变为相对偏移量。
7、零拷贝
kafka速度快的原因:
- 多个partition可被并行读取
- 顺序写磁盘
- ”零拷贝“
零拷贝
DMA(Direct Memory Access):直接存储器访问,一种可让某些硬件子系统去直接访问系统主内存,而不用依赖CPU的计算机系统的功能。也就是跳过CPU,直接访问主内存。传统的内存访问都需要通过CPU的调度来完成,DMA技术出现后使系统 CPU 从实际的 I/O 数据传输过程中摆脱出来,从而大大提高系统的吞吐率。
很多硬件都支持DMA,其中就包括网卡。实际IO读写,需要进行IO中断,需要CPU响应中断(带来上下文切换),尽管后来引入DMA来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。在这两次CPU拷贝中,应用缓存除了缓存数据并将其传输回socket缓冲区之外并没有实际做什么,所以这两步是可以做优化的,传统IO方式,会注意到实际上并不需要第二个和第三个数据副本。
说明:这种优化掉两次cpu拷贝的场景,是在读取磁盘的文件后不需要进一步的处理,直接通过网络发送出去,kafka落盘操作就是这种情况,只是把数据写入磁盘,而不对数据进行操作。如果读取到的磁盘数据需要应用程序进一步处理的话,则必须要经过第二次和第三次拷贝,让数据在应用缓冲区处理。
Memory Mapped Files,简称为mmap,他的作用就是将磁盘文件映射到内存,通过修改内存就能修改磁盘文件。实现方式就是通过页表的方式实现文件到内存的映射,映射完成后对物理内存的操作会被同步到磁盘上(在合适的时候有操作系统完成)。
通过mmap,进程想读写硬盘一样读写内存,这个内存是虚拟内存,所以不必担心实际内存的大小,因为有虚拟内存兜底。使用这种方式I/O提升很大,省去了内核空间到用户空间复制的开销。
mmap实现的”零拷贝”将上下文的切换次数从4次减少为了两次,拷贝次数从4次降低到了3次。
sendfile函数,Linux从2.1开始引入此函数用于将文件通过socket发送,刚开始时和mmap没有区别,Linux2.4做了进一步的优化。