今天和大家聊一下,kafka
对于消息的可靠性保证。作为消息引擎组件,保证消息不丢失,是非常重要的。
那么kafka
是如何保证消息不丢失的呢?
前提条件
任何消息组件不丢数据都是在特定场景下一定条件的,kafka
要保证消息不丢,有两个核心条件。
第一,必须是已提交的消息
,即committed message
。kafka
对于committed message
的定义是,生产者提交消息到broker
,并等到多个broker
确认并返回给生产者已提交的确认信息。而这多个broker
是由我们自己来定义的,可以选择只要有一个broker
成功保存该消息就算是已提交,也可以是令所有broker
都成功保存该消息才算是已提交。不论哪种情况,kafka
只对已提交的消息做持久化保证。
第二,也就是最基本的条件,虽然kafka
集群是分布式的,但也必须保证有足够broker
正常工作,才能对消息做持久化做保证。也就是说 kafka
不丢消息是有前提条件的,假如你的消息保存在 N 个kafka broker
上,那么这个前提条件就是这 N 个broker
中至少有 1 个存活。只要这个条件成立,kafka
就能保证你的这条消息永远不会丢失。
如何保证消息不丢
一条消息从产生,到发送到kafka
保存,到被取出消费,会有多个场景和流程阶段,可能会出现丢失情况,我们聊一下kafka
通过哪些手段来保障消息不丢。
生产端
Producer
端可能会丢失消息。目前Kafka Producer
是异步发送消息的,也就是说如果你调用的是producer.send(msg)
这个API
,那么它通常会立即返回,但此时你不保证消息发送已成功完成。可能会出现:网络抖动,导致消息压根就没有发送到Broker
端;或者消息本身不合规导致Broker
拒绝接收(比如消息太大了,超过了Broker
的限制)。
实际上,使用producer.send(msg, callback)
接口就能避免这个问题,根据回调,一旦出现消息提交失败的情况,就可以有针对性地进行处理。如果是因为那些瞬时错误,Producer
重试就可以了;如果是消息不合规造成的,那么调整消息格式后再次发送。总之,处理发送失败的责任在Producer
端而非Broker
端。当然,如果此时broker
宕机,那就另当别论,需要及时处理broker
异常问题。
消费端
Consumer
端丢数据的情况,稍微复杂点。Consumer
有个”位移“(offset
)的概念,表示Consumer
当前消费到topic
分区的哪个位置。如图:
kafka
通过先消费消息,后更新offset
,来保证消息不丢失。但是这样可能会出现消息重复的情况,具体如何保证only-once
,后续再单独分享。
当我们consumer
端开启多线程异步去消费时,情况又会变得复杂一些。此时consumer
自动地向前更新offset
,假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于consumer
而言实际上是丢失了。这里的关键就在自动提交offset
,如何真正地确认消息是否真的被消费,再进行更新offset
。
这个问题的解决起来也简单:如果是多线程异步处理消费消息,consumer
不要开启自动提交offset
,consumer
端程序自己来处理offset
的提交更新。提醒你一下,单个consumer
程序使用多线程来消费消息说起来容易,写成代码还是有点麻烦的,因为你很难正确地处理offset
的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。
实践配置
最后分享下kafka
无消息丢失配置:
producer
端使用producer.send(msg, callback)
带有回调的send
方法。- 设置
acks = all
。acks
是Producer
的一个参数,代表“已提交”消息的定义。如果设置成all
,则表明所有Broker
都要接收到消息,该消息才算是“已提交”。 - 设置
retries
为一个较大的值。同样是Producer
的参数。当出现网络抖动时,消息发送可能会失败,此时配置了retries
的Producer
能够自动重试发送消息,尽量避免消息丢失。 - 设置
unclean.leader.election.enable = false
。这是Broker
端的参数,在kafka
版本迭代中社区也多次反复修改过他的默认值,之前比较具有争议。它控制哪些Broker
有资格竞选分区的Leader
。如果一个Broker
落后原先的Leader
太多,那么它一旦成为新的Leader
,将会导致消息丢失。故一般都要将该参数设置成false
。 - 设置
replication.factor >= 3
。这也是Broker
端的参数。保存多份消息冗余,不多解释了。 - 设置
min.insync.replicas > 1
。Broker
端参数,控制消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在生产环境中不要使用默认值 1。确保replication.factor > min.insync.replicas
。如果两者相等,那么只要有一个副本离线,整个分区就无法正常工作了。推荐设置成replication.factor = min.insync.replicas + 1
。 - 确保消息消费完成再提交。
Consumer
端有个参数enable.auto.commit
,最好设置成false
,并自己来处理offset
的提交更新。
春节将至,希望大家春节期间,线上服务稳定运行不宕机。提前祝大家新年快乐。