kafka生产者
1、生产者
1.1、batch
生产者在发送消息时不会将每一条消息都单独发送到broker中,而是采取了批量发送以此减少网络请求次数和磁盘IO次数,消息会先发到缓冲区,这个缓冲区的大小是可配置的,在缓冲区中将消息进行分批,缓冲区大小默认为32M,当缓冲区使用完后消息的追加将会被阻塞直到有空闲的区域。
batch.size: 此参数配置发送到同一分区的批量提交的数据的大小,默认为16k,当消息积压到这个值就会批量发送,增大batch可以增加吞吐量,减小batch客降低消息延迟。
linger.ms:但生产者也并不是非要等到大小挤压到一个批次大小才会发送,此参数就是等待一个批次大小的时间,linger.ms参数控制batch最大的空闲时间,超过该时间的batch也会被发送到broker端。
说明:当这两个参数同时设置的时候,只要两个条件中满足一个就会发送。比如说batch.size设置16kb,linger.ms设置50ms,那么当消息积压达到16kb就会发送,如果没有到达16kb,那么在第一个消息到来之后的50ms之后消息将会发送。
2、同步发送与异步发送
2.1、同步方式
同步方式发送消息必需要等结果返回。
2.2、异步方式
生产者将消息发送给kafka之后不会等待结果返回,调用者可以继续其他的工作。对sarama函数库来说异步发送需要注意的地方就是Return.Successes的设置,如果没有将Return.Successes设置为true,而且消息发送的过程中没有出现错误,则在select块中要设置default,否则会挂住,如下:
1 |
|
因为如果没有default语句,则上面的两个case都不会匹配到,就会挂在这。如果不写default语句,则Return.Successes要置为true。
此外还需考虑的一个问题就是当设置了Return.Successes的值为true,但是没有使用producer.Successes(),也会出现为题,因为随着生产消息的成功,success的chan会被写满(kafka内部控制大小),写满的结果就是导致后面的success结果丢失,并且会导致producer不可用。
所以config.Producer.Return.Successes = true和操作<-producer.Successes()必须配套使用;配置成true,那么就要去读取Successes,如果配置成false,则不能去读取Successes。
3、分区策略
指明分区序号,会直接发送到被指定的分区中
没有指明分区序号,但消息存在key值,则会将key的hash值与topic的分区数取余得到分区序号
即没有partition序号,有没有key值的情况会在第一次调用时随机生成一个小于分区数的值,后面每次调用都会加一并与分区数取余作为分区号,此种方式即为round-robin方式
4、ack应答机制
ISR(in-sync replica set)
对于leader分区,它维护了一个ISR集合,记录了和leader保持同步的foller集合,当ISR中的follower完成同步之后会向leader发送ACK。当follower长时间未向leader同步消息,此follower会被踢出ISR,该函数阈值可有replica.lag.time.max.ms(broker)参数设定,当leader发生故障之后会被从ISR中移除,然后从剩下的ISR中选出一个作为leader。
4.1、NoResponse (0)
当ack设置为NoResponse时,producer不等待broker的ack,这一操作提供了一个最低的延迟,broker还没来的写入磁盘就返回,设置为此值延迟最低。
4.2、WaitForLocal (1)
ack设置为1时,会等待leader分区落盘成功后返回ack,如果leader落盘失败会重试,如果follower同步之前leader出现故障,数据将会丢失。
4.3、WaitForAll (-1)
ack设置为-1,表示所有的副本成功落盘后才返回ack,但是当所有的分区副本同步成功后在leader分区返回ack之前leader分区出现故障,则会导致producer重试,消息重复,可靠性最高。