定义
偏移量是,在一个分区内下一条需要发送给消费者的消息位置。Kafka包括两种类型的offset:
- Current offset
- 当前偏移量是指向Kafka在最近一次轮询中已发送给消费者的最后一条记录的指针。所以Current Offset消费者不会获取到相同的记录
- Committed offset
- Commited Offset 是消费者已确认处理的位置
总结
- Current offset:Send Records:用来避免向同一个消费者发送相同的数据
- Committed offset:Processed Records:避免在分区平衡的状况下,向新的消费者发送相同的记录
Commited Offset在分区平衡的情况下至关重要。分区平衡的情况下,新的消费者被分配到新的分区时,Committed Offset可以解决从哪里开始、哪些记录已经被消费的问题。
Commit an offset
Current offset 和 committed offset由Kafka管理。提交一个offset的方式有两种:
- Auto commit
- Manual-commit
Auto commit
通过两个属性来控制Auto-Commit:
auto-commit的一个问题是,在提交之前,数据可能会被其他消费者处理。这种情况下,没办法彻底避免消息被重复处理。
Manual commit
可以使用manual-commit的方式解决auto-commit的问题。manual-commit由两种方式:
一个简单示例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import java.util.*; import java.io.*; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ManualConsumer{
public static void main(String[] args) throws Exception{
String topicName = "SupplierTopic"; String groupName = "SupplierTopicGroup";
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("group.id", groupName); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "SupplierDeserializer"); props.put("enable.auto.commit", "false");
KafkaConsumer<String, Supplier> consumer = null;
try { consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName));
while (true){ ConsumerRecords<String, Supplier> records = consumer.poll(100); for (ConsumerRecord<String, Supplier>record : records){ System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + " Supplier Name = " + record.value().getName() + " Supplier Start Date = " + record.value().getStartDate().toString()); } consumer.commitAsync(); } }catch(Exception ex){ ex.printStackTrace(); }finally{ consumer.commitSync(); consumer.close(); } } }
|
这里:
Offset backend storage
Offset记录位置是根据Kafka broker版本和Kafka client版本决定。
Kafka version\Kafka deriver version |
<0.9 |
>=0.9 |
<0.9 |
Offset Storage: Zookeeper |
Offset Storage: Zookeeper |
>=0.9 |
Offset Storage: Zookeeper |
Offset Storage: Kafka |
如果Broker存储Offset,处理方式:
- Kafka把Offset作为Message存储在topic
__consumer_offsets
中
- 每个consumer定期向这个topic 提交Message,Message包括
- current offset
- consumer group
- partition number
- topic
在kafka 0.11.x 版本配合 cppkafka client,可以断定,offset 后端存储是kafka broker进行管理,依据:
- 可以查询到
__consumer_offsets
主题,且有offsets信息
1 2 3
| $ ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets --zookeeper *:2181/feeds/infra/feeds-kafka-srv --from-beginning [consumer,test,0]::[OffsetMetadata[332,NO_METADATA],CommitTime 1537347025096,ExpirationTime 1537433425096] ...
|
- 对应zookeeper节点中没有对应offsets信息
- 路径格式:
/consumers/{CONSUMER_GROUP_ID}/offsets/{TOPIC_NAME}/{PARTITION_NUMBER}
Ref
参考