HighWaterMark与副本同步机制

0.11版本之前依赖于HW实现副本同步备份,0.11之后通过Leader epoch实现副本同步备份。

两个概念:

  • LEO:log end offset,日志末端位移,记录了该副本底层日志下一条消息应存放的位置。
  • HW,高水位值,小于等于HW的消息是被认为已经备份的。

在这里插入图片描述
[0, 7]的消息是已经完全备份,[8, 15)的数据为尚未完全备份的。

kafka中所有的副本都保有自己的HW值和LEO值,但leader副本中还保存有其他副本的HW值和LEO值,如下图:
在这里插入图片描述
leader中保存的副本的HW值和LEO值得作用是用来更新leader副本的HW值。

1、更新HW值的时机

  • follower副本:follower副本会在更新LEO值之后尝试更新HW值。
  • leader副本:
    • 当某个follower副本成为leader副本时;
    • broker崩溃导致副本被踢出ISR
    • producer向leader副本写消息时,由于LEO会被更改,因此会检查下HW是否需要更新;
    • leader处理follower的fetch请求时,首先会从log取数据,然后会尝试更新HW值

1.1、leader副本更新HW

处理生产者请求的逻辑:

  • 写入消息到本次磁盘
  • 更新分区高水位
    • 获取leader副本所在broker端保存的所有远程副本LEO值(LEO-0, LEO-1,… … LEO-n)
    • 获取leader副本高水位值,currentHW
    • 更新currentHW=max{currentHW,min(LEO-1,LEO-2,…..,LEO-n)}

处理follower副本拉取消息的处理逻辑:

  • 读取磁盘或页缓存中的数据
  • 使用follower副本发送请求中位移值更新leader副本LEO值
  • 更新分区高水位值,具体步骤与处理生产者请求的步骤相同

1.2、follower副本更新HW

  • 写入消息到log文件
  • 更新LEO值
  • 更新高水位
    • 获取leader的高水位值:currentHW
    • 获取步骤2刚才更新的LEO值,currentLEO
    • 更新高水位值,min(currnetHW,currentLEO)

2、副本同步机制

当生产者生产一条消息,leader和follower副本高水位更新如下:
初始状态时,所有的值都为0
在这里插入图片描述
生产者发送一条消息后
在这里插入图片描述
当leader副本将消息写入磁盘后LEO值变为1
follower再次从leader拉取消息,有消息拉取到,则状态进一步更新
在这里插入图片描述
可以看到此时的副本LEO也被更新为1,但是leader副本和follower副本的HW仍然没有更新,这会在下一次follower的fetch请求时更新。

在这里插入图片描述
在新一轮的拉取中,由于位移值时0的消息已经拉取成功,则本次拉取会拉去位移值为1的消息,leader副本接收到请求后,会将remote LEO值更新为1,然后更新HW值为1,将更新后的HW值发送到follower,follower将HW值更新为1,致辞一个完整的同步过程完成。

上述的拉取流程可以看到kafka使用HW来决定副本备份的进度,而HW值的更新需要新一轮的Fetch请求才能实现,这种设计是有问题的,可能会引起备份数据丢失和备份数据不一致的问题。

3、Leader epoch

备份数据丢失的问题
在这里插入图片描述
当生产者发送两条消息leader副本都完成落盘,而且follower副本B的HW已经更新为1,在第二条消息的同步时,fetch请求的响应失败,则B的HW就不会被更新到2,如果此时B重启,则B在重启后会重新加载之前的HW值,也就是1,此时副本B会做日志截断把2的消息删除,并调整LEO为1。
B会向A的leader副本发出同步消息的fetch请求,如果此时A恰好宕机,则B升级为leader,而当A重启回来后也会执行日志截断,将HW调整回1与B相同。这样,位移=1的消息就从两个副本的log中被删除,即永远地丢失了。

备份数据不一致问题

在这里插入图片描述
A为leader,B为follower,B的HW只更新到1,此时A、B同事宕机,但是B先恢复,此时B为leader,在A恢复之前又向B写入了新的消息2,此时B的HW为2,此时A恢复,A的HW与B的HW相等,不会出现日志截断,之后会正常的执行下去,但是副本A与leader副本B数据存在不一致的地方。

针对上述的两个问题,kafka 0.11版本对此进行了优化,采用leader epoch技术来解决上述问题。

epoch,单调递增的版本号,每当副本领导权发生改变时都会增加该版本号,小版本号的leader被认为是过期的leader,不能再行使leader的权力。它其实是一对值,即(epoch, offset),offset为该epoch版本对应的leader写入的第一条消息的位移。
(0, 0)
(1, 120)
则表示第一个leader从位移0开始写入消息;共写了120条[0, 119];而第二个leader版本号是1,从位移120处开始写入消息.

​​leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件中。

当leader写底层log时它会尝试更新整个缓存——如果这个leader首次写消息,则会在缓存中增加一个条目;否则就不做更新。而每次副本重新成为leader时会查询这部分缓存,获取出对应leader版本的位移,这就不会发生数据不一致和丢失的情况。
在这里插入图片描述
在这里插入图片描述