关于kafka的offset存储

消费者通过offset控制消费的进度,这里有几个概念先解释一下.

  • Offset: 每个ConsumerGroup中针对一个topic的每个Partition的消费进度.通过这个来控制消费进度.
  • LogSize: Kafka的数据位置,随着新的数据到来而增加.
  • Lag: LogSize - Offset . 指落后的大小.

因此正常Consumer的不堆积是Lag的值处于比较小的范围,比如 0~1000.

然而,存在的一些问题:

  • 那随着数据量的增加,offset和logSize的值一直增加,到超过int的范围吗,还是有清零的规则.(应该是有相应的机制,这个不重要了)
  • 有关offset的一些注意点如下

存储位置

从kafka-0.9版本及以后,消费者组和offset信息就不存在zk中了,而是存到broker服务器上.存放在一个叫__consumer_offsets的topic中.

关于offset的消费者参数

auto.offset.reset

1
2
3
4
5
6
earliest 
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

也就是说,这个参数的指定只有在新的consumer group添加的时候,或者其他原因导致分区上的offset没有了的情况,才更有意义.

那随之又有的问题:

  • 如果为了能消费新的数据,而对于老的customer-group,不想消费堆积的数据. 或者说想废弃掉这个group了,那不用之后会有什么影响
  • 另外,对于无止尽的customer-group创建,对kafka集群有什么影响吗,当然不仅仅是新group替代旧的group.而是还有在用group的增多,会对集群有什么影响?

下面详细总结下

1. 废弃group的增多

个人理解: group增多,增加了对group的管理成本,那对于不用的group,存放在broker中,不会对其它造成影响.
目前只是猜测,具体再详细研究.

2. 在用group的增多

对于老版本(zk管理customer信息和offset), 会增加customer与zk的交互成本.

新版本(大于0.9), customer信息和offset由broker管理,只是增加了customer与broker的交互, 然而这一部分交互信息对于整个数据流来说微乎其微,所以影响应该不大.

需要在研究下offset更新的流程(customer与broker)

再聊聊kafka的group coordinator

Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。

  • 在 Server 端增加了 GroupCoordinator 这个角色
  • 将 topic 的 offset 信息由之前存储在 zookeeper(/consumers//offsets//,zk写操作性能不高) 上改为存储到一个特殊的 topic 中(__consumer_offsets)

1. rebalance时机

  • 有新的consumer加入
  • 旧的consumer挂了
  • coordinator挂了,集群选举出新的coordinator
  • topic的partition新加
  • consumer调用unsubscrible(),取消topic的订阅

关于offset的提交,管理

2. __consumer_offsets

Consumer通过发送OffsetCommitRequest请求到指定broker(偏移量管理者)提交偏移量。

这个请求中包含一系列分区以及在这些分区中的消费位置(偏移量)

偏移量管理者会追加键值(key-value)形式的消息到一个指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition组成的,而value是偏移量。

感觉其实用HashMap应该更好一些,因为通过key来获取或管理offset(偏移量-value)

因为这种存储方式(队列), find的时间复杂度为O(n), 需要遍历整个__consumer_offsets,扫描全部偏移量topic日志.

因此集群的内存中也是维护了一份最近的记录,为了能在指定key的情况下能够快速的给出OffsetFetchRequests而不用扫描全部偏移量topic日志.

如果偏移量管理者因某种原因失败,新的broker将会成为偏移量管理者并且通过扫描偏移量topic来重新生成偏移量缓存。

ps: 内存中应该是Map结构,那内存中的记录与偏移量topic(__consumer_offsets)的数据怎么保证一致性的呢??

3. Consumer与Consumer Group

consumer group是kafka提供的可扩展且具有容错性的消费者机制。组内可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。

consumer instance可以是一个进程,也可以是一个线程.

有关offset的几个概念

Kafka 之 Group 状态变化分析及 Rebalance 过程